Skip to content

Commit ce21c24

Browse files
msft-paddy14vazoisbadrishc
authored
Convert sync over async network handling to async (#835)
* test garnet benchmark * Revert "test garnet benchmark" This reverts commit 1b3dca6. * add async socketprocessing * resolve conflict * fix async socket processing * remove bad files * fix formatting * fix exception handling * 1. convert Task->ValueTask for netowkr processing 2. Refactor continuewith to async/await pattern * add condition await * add bdn test for networking * add async in test * remove visibility changes for testing * add bdn test for networking * add async in test * fix tests for generic path * remove visibility changes for testing * fix tests for generic path * fix formatting * fix formatting * rewrite network test * rewrite network test * set max/min threads * set max/min threads * fix formatting * fix formatting * fix naming violations * remove dead code * formatting * replace with do while * remove networking test in BDN * address formatting comments * remove bad include * remove more dead code * use new NetworkRecive method for benchmark * Adjust for async mdoe * Change method names for clairty * update slowconsume to suync * fix formatting * remove tls cert copy * fix formatting * remove whitespace * revert names of tests * bump version and bdn fixes * formatting fixes * revert unsafe class changes * fix comments * segregate TLS and non TLS paths * separate based on TLS and no-TLS * nit * fix bdn --------- Co-authored-by: Vasileios Zois <[email protected]> Co-authored-by: Badrish Chandramouli <[email protected]>
1 parent 9fb9c07 commit ce21c24

File tree

7 files changed

+117
-70
lines changed

7 files changed

+117
-70
lines changed

Version.props

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<!-- Versioning property for builds and packages -->
33
<PropertyGroup>
4-
<VersionPrefix>1.0.48</VersionPrefix>
4+
<VersionPrefix>1.0.49</VersionPrefix>
55
</PropertyGroup>
66
</Project>

benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs

+10-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Diagnostics;
6+
using System.Threading.Tasks;
67
using Garnet.common;
78
using Garnet.networking;
89
using Microsoft.Extensions.Logging;
@@ -11,8 +12,11 @@ namespace Embedded.server
1112
{
1213
internal class EmbeddedNetworkHandler : NetworkHandler<GarnetServerEmbedded, EmbeddedNetworkSender>
1314
{
15+
readonly bool useTLS;
16+
1417
public EmbeddedNetworkHandler(GarnetServerEmbedded serverHook, EmbeddedNetworkSender networkSender, NetworkBufferSettings networkBufferSettings, LimitedFixedBufferPool networkPool, bool useTLS, IMessageConsumer messageConsumer = null, ILogger logger = null) : base(serverHook, networkSender, networkBufferSettings, networkPool, useTLS, messageConsumer, logger)
1518
{
19+
this.useTLS = useTLS;
1620
}
1721

1822
public override string RemoteEndpointName => throw new NotImplementedException();
@@ -24,14 +28,16 @@ public override void Dispose()
2428

2529
public override bool TryClose() => throw new NotImplementedException();
2630

27-
public unsafe void Send(Request request)
31+
public async ValueTask Send(Request request)
2832
{
2933
networkReceiveBuffer = request.buffer;
30-
networkReceiveBufferPtr = request.bufferPtr;
34+
unsafe { networkReceiveBufferPtr = request.bufferPtr; }
3135

32-
OnNetworkReceive(request.buffer.Length);
36+
if (useTLS)
37+
await OnNetworkReceiveWithTLSAsync(request.buffer.Length);
38+
else
39+
OnNetworkReceiveWithoutTLS(request.buffer.Length);
3340

34-
// We should have consumed the entire buffer
3541
Debug.Assert(networkBytesRead == 0);
3642
Debug.Assert(networkReadHead == 0);
3743
}

benchmark/BDN.benchmark/Network/BasicOperations.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace BDN.benchmark.Network
1010
/// Benchmark for BasicOperations
1111
/// </summary>
1212
[MemoryDiagnoser]
13-
public unsafe class BasicOperations : NetworkBase
13+
public class BasicOperations : NetworkBase
1414
{
1515
static ReadOnlySpan<byte> INLINE_PING => "PING\r\n"u8;
1616
Request ping;
@@ -22,9 +22,9 @@ public override void GlobalSetup()
2222
}
2323

2424
[Benchmark]
25-
public void InlinePing()
25+
public async ValueTask InlinePing()
2626
{
27-
Send(ping);
27+
await Send(ping);
2828
}
2929
}
3030
}

benchmark/BDN.benchmark/Network/NetworkBase.cs

+1-4
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,7 @@ public virtual void GlobalCleanup()
6464
server.Dispose();
6565
}
6666

67-
protected void Send(Request request)
68-
{
69-
networkHandler.Send(request);
70-
}
67+
protected ValueTask Send(Request request) => networkHandler.Send(request);
7168

7269
protected unsafe void SetupOperation(ref Request request, ReadOnlySpan<byte> operation, int batchSize = batchSize)
7370
{

benchmark/BDN.benchmark/Network/RawStringOperations.cs

+21-21
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace BDN.benchmark.Network
1010
/// Benchmark for RawStringOperations
1111
/// </summary>
1212
[MemoryDiagnoser]
13-
public unsafe class RawStringOperations : NetworkBase
13+
public class RawStringOperations : NetworkBase
1414
{
1515
static ReadOnlySpan<byte> SET => "*3\r\n$3\r\nSET\r\n$1\r\na\r\n$1\r\na\r\n"u8;
1616
Request set;
@@ -65,63 +65,63 @@ public override void GlobalSetup()
6565
}
6666

6767
[Benchmark]
68-
public void Set()
68+
public async ValueTask Set()
6969
{
70-
Send(set);
70+
await Send(set);
7171
}
7272

7373
[Benchmark]
74-
public void SetEx()
74+
public async ValueTask SetEx()
7575
{
76-
Send(setex);
76+
await Send(setex);
7777
}
7878

7979
[Benchmark]
80-
public void SetNx()
80+
public async ValueTask SetNx()
8181
{
82-
Send(setnx);
82+
await Send(setnx);
8383
}
8484

8585
[Benchmark]
86-
public void SetXx()
86+
public async ValueTask SetXx()
8787
{
88-
Send(setxx);
88+
await Send(setxx);
8989
}
9090

9191
[Benchmark]
92-
public void GetFound()
92+
public async ValueTask GetFound()
9393
{
94-
Send(getf);
94+
await Send(getf);
9595
}
9696

9797
[Benchmark]
98-
public void GetNotFound()
98+
public async ValueTask GetNotFound()
9999
{
100-
Send(getnf);
100+
await Send(getnf);
101101
}
102102

103103
[Benchmark]
104-
public void Increment()
104+
public async ValueTask Increment()
105105
{
106-
Send(incr);
106+
await Send(incr);
107107
}
108108

109109
[Benchmark]
110-
public void Decrement()
110+
public async ValueTask Decrement()
111111
{
112-
Send(decr);
112+
await Send(decr);
113113
}
114114

115115
[Benchmark]
116-
public void IncrementBy()
116+
public async ValueTask IncrementBy()
117117
{
118-
Send(incrby);
118+
await Send(incrby);
119119
}
120120

121121
[Benchmark]
122-
public void DecrementBy()
122+
public async ValueTask DecrementBy()
123123
{
124-
Send(decrby);
124+
await Send(decrby);
125125
}
126126
}
127127
}

libs/common/Networking/NetworkHandler.cs

+25-22
Original file line numberDiff line numberDiff line change
@@ -267,39 +267,40 @@ async Task AuthenticateAsClientAsync(SslClientAuthenticationOptions sslClientOpt
267267
}
268268
}
269269

270+
public unsafe void OnNetworkReceiveWithoutTLS(int bytesTransferred)
271+
{
272+
networkBytesRead += bytesTransferred;
273+
transportReceiveBuffer = networkReceiveBuffer;
274+
transportReceiveBufferPtr = networkReceiveBufferPtr;
275+
transportBytesRead = networkBytesRead;
276+
277+
// Process non-TLS code on the synchronous thread
278+
Process();
279+
280+
EndTransformNetworkToTransport();
281+
UpdateNetworkBuffers();
282+
}
283+
270284
/// <summary>
271285
/// On network receive
272286
/// </summary>
273287
/// <param name="bytesTransferred">Number of bytes transferred</param>
274-
public unsafe void OnNetworkReceive(int bytesTransferred)
288+
public async ValueTask OnNetworkReceiveWithTLSAsync(int bytesTransferred)
275289
{
276290
// Wait for SslStream async processing to complete, if any (e.g., authentication phase)
277291
while (readerStatus == TlsReaderStatus.Active)
278-
expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false).GetAwaiter().GetResult();
292+
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
279293

280294
// Increment network bytes read
281295
networkBytesRead += bytesTransferred;
282296

283297
switch (readerStatus)
284298
{
285299
case TlsReaderStatus.Rest:
286-
// Synchronously try to process the received data
287-
if (sslStream == null)
288-
{
289-
transportReceiveBuffer = networkReceiveBuffer;
290-
transportReceiveBufferPtr = networkReceiveBufferPtr;
291-
transportBytesRead = networkBytesRead;
292-
293-
// We do not have an active read task, so we will process on the network thread
294-
Process();
295-
}
296-
else
297-
{
298-
readerStatus = TlsReaderStatus.Active;
299-
Read();
300-
while (readerStatus == TlsReaderStatus.Active)
301-
expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false).GetAwaiter().GetResult();
302-
}
300+
readerStatus = TlsReaderStatus.Active;
301+
Read();
302+
while (readerStatus == TlsReaderStatus.Active)
303+
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
303304
break;
304305
case TlsReaderStatus.Waiting:
305306
// We have a ReadAsync task waiting for new data, set it to active status
@@ -309,17 +310,19 @@ public unsafe void OnNetworkReceive(int bytesTransferred)
309310
_ = receivedData.Release();
310311

311312
while (readerStatus == TlsReaderStatus.Active)
312-
expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false).GetAwaiter().GetResult();
313+
await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
313314
break;
314315
default:
315316
ThrowInvalidOperationException($"Unexpected reader status {readerStatus}");
316317
break;
317318
}
318319

319320
Debug.Assert(readerStatus != TlsReaderStatus.Active);
321+
UpdateNetworkBuffers();
322+
}
320323

321-
EndTransformNetworkToTransport();
322-
324+
void UpdateNetworkBuffers()
325+
{
323326
// Shift network buffer after processing is done
324327
if (networkReadHead > 0)
325328
ShiftNetworkReceiveBuffer();

libs/common/Networking/TcpNetworkHandlerBase.cs

+56-15
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ public TcpNetworkHandlerBase(TServerHook serverHook, TNetworkSender networkSende
3939

4040
remoteEndpoint = socket.RemoteEndPoint is IPEndPoint remote ? $"{remote.Address}:{remote.Port}" : "";
4141
localEndpoint = socket.LocalEndPoint is IPEndPoint local ? $"{local.Address}:{local.Port}" : "";
42-
4342
AllocateNetworkReceiveBuffer();
4443
}
4544

@@ -52,28 +51,28 @@ public TcpNetworkHandlerBase(TServerHook serverHook, TNetworkSender networkSende
5251
/// <inheritdoc />
5352
public override void Start(SslServerAuthenticationOptions tlsOptions = null, string remoteEndpointName = null, CancellationToken token = default)
5453
{
55-
Start();
54+
Start(tlsOptions != null);
5655
base.Start(tlsOptions, remoteEndpointName, token);
5756
}
5857

5958
/// <inheritdoc />
6059
public override async Task StartAsync(SslServerAuthenticationOptions tlsOptions = null, string remoteEndpointName = null, CancellationToken token = default)
6160
{
62-
Start();
61+
Start(tlsOptions != null);
6362
await base.StartAsync(tlsOptions, remoteEndpointName, token).ConfigureAwait(false);
6463
}
6564

6665
/// <inheritdoc />
6766
public override void Start(SslClientAuthenticationOptions tlsOptions, string remoteEndpointName = null, CancellationToken token = default)
6867
{
69-
Start();
68+
Start(tlsOptions != null);
7069
base.Start(tlsOptions, remoteEndpointName, token);
7170
}
7271

7372
/// <inheritdoc />
7473
public override async Task StartAsync(SslClientAuthenticationOptions tlsOptions, string remoteEndpointName = null, CancellationToken token = default)
7574
{
76-
Start();
75+
Start(tlsOptions != null);
7776
await base.StartAsync(tlsOptions, remoteEndpointName, token).ConfigureAwait(false);
7877
}
7978

@@ -102,17 +101,22 @@ public override bool TryClose()
102101
return true;
103102
}
104103

105-
void Start()
104+
void Start(bool useTLS)
106105
{
107106
var receiveEventArgs = new SocketAsyncEventArgs { AcceptSocket = socket };
108107
receiveEventArgs.SetBuffer(networkReceiveBuffer, 0, networkReceiveBuffer.Length);
109-
receiveEventArgs.Completed += RecvEventArg_Completed;
108+
receiveEventArgs.Completed += useTLS ? RecvEventArgCompletedWithTLS : RecvEventArgCompletedWithoutTLS;
110109

111110
// If the client already have packets, avoid handling it here on the handler so we don't block future accepts.
112111
try
113112
{
114113
if (!socket.ReceiveAsync(receiveEventArgs))
115-
Task.Run(() => RecvEventArg_Completed(null, receiveEventArgs));
114+
{
115+
if (useTLS)
116+
Task.Run(() => RecvEventArgCompletedWithTLS(null, receiveEventArgs));
117+
else
118+
Task.Run(() => RecvEventArgCompletedWithoutTLS(null, receiveEventArgs));
119+
}
116120
}
117121
catch (Exception ex)
118122
{
@@ -134,7 +138,35 @@ void Dispose(SocketAsyncEventArgs e)
134138
e.Dispose();
135139
}
136140

137-
void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e)
141+
void RecvEventArgCompletedWithTLS(object sender, SocketAsyncEventArgs e) =>
142+
_ = HandleReceiveWithTLSAsync(sender, e);
143+
144+
void RecvEventArgCompletedWithoutTLS(object sender, SocketAsyncEventArgs e) =>
145+
HandleReceiveWithoutTLS(sender, e);
146+
147+
private void HandleReceiveWithoutTLS(object sender, SocketAsyncEventArgs e)
148+
{
149+
try
150+
{
151+
do
152+
{
153+
if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || serverHook.Disposed)
154+
{
155+
// No more things to receive
156+
Dispose(e);
157+
break;
158+
}
159+
OnNetworkReceiveWithoutTLS(e.BytesTransferred);
160+
e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead);
161+
} while (!e.AcceptSocket.ReceiveAsync(e));
162+
}
163+
catch (Exception ex)
164+
{
165+
HandleReceiveFailure(ex, e);
166+
}
167+
}
168+
169+
private async ValueTask HandleReceiveWithTLSAsync(object sender, SocketAsyncEventArgs e)
138170
{
139171
try
140172
{
@@ -146,20 +178,29 @@ void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e)
146178
Dispose(e);
147179
break;
148180
}
149-
OnNetworkReceive(e.BytesTransferred);
181+
var receiveTask = OnNetworkReceiveWithTLSAsync(e.BytesTransferred);
182+
if (!receiveTask.IsCompletedSuccessfully)
183+
{
184+
await receiveTask;
185+
}
150186
e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead);
151187
} while (!e.AcceptSocket.ReceiveAsync(e));
152188
}
153189
catch (Exception ex)
154190
{
155-
if (ex is ObjectDisposedException ex2 && ex2.ObjectName == "System.Net.Sockets.Socket")
156-
logger?.LogTrace("Accept socket was disposed at RecvEventArg_Completed");
157-
else
158-
logger?.LogError(ex, "An error occurred at RecvEventArg_Completed");
159-
Dispose(e);
191+
HandleReceiveFailure(ex, e);
160192
}
161193
}
162194

195+
void HandleReceiveFailure(Exception ex, SocketAsyncEventArgs e)
196+
{
197+
if (ex is ObjectDisposedException ex2 && ex2.ObjectName == "System.Net.Sockets.Socket")
198+
logger?.LogTrace("Accept socket was disposed at RecvEventArg_Completed");
199+
else
200+
logger?.LogError(ex, "An error occurred at RecvEventArg_Completed");
201+
Dispose(e);
202+
}
203+
163204
unsafe void AllocateNetworkReceiveBuffer()
164205
{
165206
networkReceiveBufferEntry = networkPool.Get(networkBufferSettings.initialReceiveBufferSize);

0 commit comments

Comments
 (0)