Skip to content

Commit 58215ec

Browse files
Switch to async I/O
1 parent dd3e02d commit 58215ec

17 files changed

+720
-48
lines changed

Engine/Infrastructure/Endpoints/EndPoint.cs

+11-8
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
using System.IO;
33
using System.Net;
44
using System.Net.Sockets;
5-
using System.Threading;
65
using System.Threading.Tasks;
76

87
using GenHTTP.Api.Infrastructure;
98

109
using GenHTTP.Engine.Infrastructure.Configuration;
10+
using GenHTTP.Engine.Infrastructure.Transport;
1111

1212
using PooledAwait;
1313

@@ -93,18 +93,21 @@ private async Task Listen()
9393

9494
private void Handle(Socket client)
9595
{
96-
using var _ = ExecutionContext.SuppressFlow();
96+
client.NoDelay = true;
97+
98+
var connection = new SocketConnection(client);
99+
100+
connection.Run();
101+
102+
Accept(connection);
97103

98-
Task.Run(() => Accept(client));
99104
}
100105

101-
protected abstract PooledValueTask Accept(Socket client);
106+
protected abstract PooledValueTask Accept(SocketConnection connection);
102107

103-
protected PooledValueTask Handle(Socket client, Stream inputStream)
108+
protected PooledValueTask Handle(SocketConnection connection)
104109
{
105-
client.NoDelay = true;
106-
107-
return new ClientHandler(client, inputStream, Server, this, Configuration).Run();
110+
return new ClientHandler(connection, Server, this, Configuration).Run();
108111
}
109112

110113
#endregion

Engine/Infrastructure/Endpoints/InsecureEndPoint.cs

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
using System.Net;
2-
using System.Net.Sockets;
32

43
using GenHTTP.Api.Infrastructure;
54

65
using GenHTTP.Engine.Infrastructure.Configuration;
7-
using GenHTTP.Engine.Utilities;
6+
using GenHTTP.Engine.Infrastructure.Transport;
87

98
using PooledAwait;
109

@@ -32,7 +31,7 @@ internal InsecureEndPoint(IServer server, IPEndPoint endPoint, NetworkConfigurat
3231

3332
#region Functionality
3433

35-
protected override PooledValueTask Accept(Socket client) => Handle(client, new PoolBufferedStream(new NetworkStream(client)));
34+
protected override PooledValueTask Accept(SocketConnection connection) => Handle(connection);
3635

3736
#endregion
3837

Engine/Infrastructure/Endpoints/SecureEndPoint.cs

+6-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
using GenHTTP.Api.Infrastructure;
1010

1111
using GenHTTP.Engine.Infrastructure.Configuration;
12+
using GenHTTP.Engine.Infrastructure.Transport;
1213
using GenHTTP.Engine.Utilities;
1314

1415
using PooledAwait;
@@ -52,9 +53,11 @@ internal SecureEndPoint(IServer server, IPEndPoint endPoint, SecurityConfigurati
5253

5354
#region Functionality
5455

55-
protected override async PooledValueTask Accept(Socket client)
56+
protected override PooledValueTask Accept(SocketConnection connection)
5657
{
57-
var stream = await TryAuthenticate(client);
58+
return Handle(connection);
59+
60+
/*var stream = await TryAuthenticate(client).ConfigureAwait(false);
5861
5962
if (stream is not null)
6063
{
@@ -71,7 +74,7 @@ protected override async PooledValueTask Accept(Socket client)
7174
{
7275
Server.Companion?.OnServerError(ServerErrorScope.ClientConnection, e);
7376
}
74-
}
77+
}*/
7578
}
7679

7780
private async ValueTask<SslStream?> TryAuthenticate(Socket client)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System.IO.Pipelines;
2+
3+
namespace GenHTTP.Engine.Infrastructure.Transport
4+
{
5+
6+
internal sealed class DuplexPipe : IDuplexPipe
7+
{
8+
9+
#region Get-/Setters
10+
11+
public PipeReader Input { get; }
12+
13+
public PipeWriter Output { get; }
14+
15+
#endregion
16+
17+
#region Initialization
18+
19+
public DuplexPipe(PipeReader reader, PipeWriter writer)
20+
{
21+
Input = reader;
22+
Output = writer;
23+
}
24+
25+
#endregion
26+
27+
}
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System;
2+
using System.Runtime.InteropServices;
3+
4+
namespace GenHTTP.Engine.Infrastructure.Transport
5+
{
6+
7+
internal static class Extensions
8+
{
9+
10+
public static ArraySegment<byte> GetArray(this Memory<byte> memory)
11+
{
12+
return ((ReadOnlyMemory<byte>)memory).GetArray();
13+
}
14+
15+
public static ArraySegment<byte> GetArray(this ReadOnlyMemory<byte> memory)
16+
{
17+
if (!MemoryMarshal.TryGetArray(memory, out var result))
18+
{
19+
throw new InvalidOperationException("Buffer backed by array was expected");
20+
}
21+
return result;
22+
}
23+
24+
}
25+
26+
}
+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.IO.Pipelines;
4+
using System.Threading;
5+
6+
namespace GenHTTP.Engine.Infrastructure.Transport
7+
{
8+
9+
internal sealed class IOQueue : PipeScheduler, IThreadPoolWorkItem
10+
{
11+
private readonly ConcurrentQueue<Work> _workItems = new();
12+
13+
private int _doingWork;
14+
15+
public override void Schedule(Action<object?> action, object? state)
16+
{
17+
_workItems.Enqueue(new Work(action, state));
18+
19+
// Set working if it wasn't (via atomic Interlocked).
20+
if (Interlocked.CompareExchange(ref _doingWork, 1, 0) == 0)
21+
{
22+
// Wasn't working, schedule.
23+
System.Threading.ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false);
24+
}
25+
}
26+
27+
void IThreadPoolWorkItem.Execute()
28+
{
29+
while (true)
30+
{
31+
while (_workItems.TryDequeue(out Work item))
32+
{
33+
item.Callback(item.State);
34+
}
35+
36+
// All work done.
37+
38+
// Set _doingWork (0 == false) prior to checking IsEmpty to catch any missed work in interim.
39+
// This doesn't need to be volatile due to the following barrier (i.e. it is volatile).
40+
_doingWork = 0;
41+
42+
// Ensure _doingWork is written before IsEmpty is read.
43+
// As they are two different memory locations, we insert a barrier to guarantee ordering.
44+
Thread.MemoryBarrier();
45+
46+
// Check if there is work to do
47+
if (_workItems.IsEmpty)
48+
{
49+
// Nothing to do, exit.
50+
break;
51+
}
52+
53+
// Is work, can we set it as active again (via atomic Interlocked), prior to scheduling?
54+
if (Interlocked.Exchange(ref _doingWork, 1) == 1)
55+
{
56+
// Execute has been rescheduled already, exit.
57+
break;
58+
}
59+
60+
// Is work, wasn't already scheduled so continue loop.
61+
}
62+
}
63+
64+
private readonly struct Work
65+
{
66+
public readonly Action<object?> Callback;
67+
68+
public readonly object? State;
69+
70+
public Work(Action<object?> callback, object? state)
71+
{
72+
Callback = callback;
73+
State = state;
74+
}
75+
}
76+
77+
}
78+
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.IO.Pipelines;
4+
using System.Threading;
5+
6+
namespace GenHTTP.Engine.Infrastructure.Transport
7+
{
8+
9+
internal sealed class SocketSenderPool : IDisposable
10+
{
11+
private const int MaxQueueSize = 1024;
12+
13+
private readonly ConcurrentQueue<SocketSender> _queue = new();
14+
15+
private readonly PipeScheduler _scheduler;
16+
17+
private int _count;
18+
private bool _disposed;
19+
20+
#region Get-/Setters
21+
22+
public PipeScheduler Scheduler => _scheduler;
23+
24+
#endregion
25+
26+
#region Initialization
27+
28+
public SocketSenderPool()
29+
{
30+
_scheduler = PipeScheduler.Inline;
31+
}
32+
33+
#endregion
34+
35+
#region Functionality
36+
37+
public SocketSender Rent()
38+
{
39+
if (_queue.TryDequeue(out var sender))
40+
{
41+
Interlocked.Decrement(ref _count);
42+
return sender;
43+
}
44+
return new SocketSender(_scheduler);
45+
}
46+
47+
public void Return(SocketSender sender)
48+
{
49+
// This counting isn't accurate, but it's good enough for what we need to avoid using _queue.Count which could be expensive
50+
if (_disposed || Interlocked.Increment(ref _count) > MaxQueueSize)
51+
{
52+
Interlocked.Decrement(ref _count);
53+
sender.Dispose();
54+
return;
55+
}
56+
57+
sender.Reset();
58+
_queue.Enqueue(sender);
59+
}
60+
61+
public void Dispose()
62+
{
63+
if (!_disposed)
64+
{
65+
_disposed = true;
66+
while (_queue.TryDequeue(out var sender))
67+
{
68+
sender.Dispose();
69+
}
70+
}
71+
}
72+
73+
#endregion
74+
75+
}
76+
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using System.Buffers;
2+
using System.IO.Pipelines;
3+
4+
namespace GenHTTP.Engine.Infrastructure.Transport
5+
{
6+
7+
internal readonly struct ServicePipe
8+
{
9+
private static readonly MemoryPool<byte> MEMORY_POOL = MemoryPool<byte>.Shared;
10+
11+
#region Get-/Setters
12+
13+
internal IDuplexPipe Transport { get; }
14+
15+
internal IDuplexPipe Application { get; }
16+
17+
internal IOQueue Scheduler { get; }
18+
19+
#endregion
20+
21+
#region Initialization
22+
23+
internal static ServicePipe Create()
24+
{
25+
var applicationScheduler = PipeScheduler.ThreadPool;
26+
27+
var transportScheduler = new IOQueue();
28+
29+
var inputOptions = new PipeOptions(MEMORY_POOL, applicationScheduler, transportScheduler, useSynchronizationContext: false);
30+
31+
var outputOptions = new PipeOptions(MEMORY_POOL, transportScheduler, applicationScheduler, useSynchronizationContext: false);
32+
33+
var input = new Pipe(inputOptions);
34+
var output = new Pipe(outputOptions);
35+
36+
var transport = new DuplexPipe(output.Reader, input.Writer);
37+
var application = new DuplexPipe(input.Reader, output.Writer);
38+
39+
return new ServicePipe(application, transport, transportScheduler);
40+
}
41+
42+
private ServicePipe(IDuplexPipe transport, IDuplexPipe application, IOQueue scheduler)
43+
{
44+
Transport = transport;
45+
Application = application;
46+
47+
Scheduler = scheduler;
48+
}
49+
50+
#endregion
51+
52+
}
53+
54+
}

0 commit comments

Comments
 (0)