-
Notifications
You must be signed in to change notification settings - Fork 711
Expand file tree
/
Copy pathCompositeDiscoveryApp.cs
More file actions
160 lines (135 loc) · 5.56 KB
/
Copy pathCompositeDiscoveryApp.cs
File metadata and controls
160 lines (135 loc) · 5.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only
using System.Net.Sockets;
using System.Runtime.InteropServices;
using Nethermind.Core.Collections;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using Nethermind.Core.ServiceStopper;
using Nethermind.Logging;
using Nethermind.Network.Config;
using Nethermind.Network.Discovery.Discv5;
using Nethermind.Serialization.Rlp;
using Nethermind.Stats.Model;
namespace Nethermind.Network.Discovery;
/// <summary>
/// Combines several protocol versions under a single <see cref="IDiscoveryApp"/> implementation.
/// </summary>
public class CompositeDiscoveryApp : IDiscoveryApp
{
private readonly INetworkConfig _networkConfig;
private readonly IConnectionsPool _connections;
private readonly IChannelFactory? _channelFactory;
private readonly IDiscoveryApp[] _discoveryApps;
private readonly CompositeNodeSource _compositeNodeSource;
private readonly ILogger _logger;
public CompositeDiscoveryApp(
INetworkConfig networkConfig,
IDiscoveryConfig discoveryConfig,
IIPResolver ipResolver,
ILogManager logManager,
Func<DiscoveryV5App> discoveryV5Factory, // These two are factory because they are optional.
Func<DiscoveryApp> discoveryV4Factory,
IChannelFactory? channelFactory = null
)
{
_networkConfig = networkConfig;
_connections = new DiscoveryConnectionsPool(logManager.GetClassLogger<DiscoveryConnectionsPool>(), ipResolver, discoveryConfig);
_channelFactory = channelFactory;
_logger = logManager.GetClassLogger<CompositeDiscoveryApp>();
List<IDiscoveryApp> discoveryApps = new(2);
if ((discoveryConfig.DiscoveryVersion & DiscoveryVersion.V4) != 0)
{
discoveryApps.Add(discoveryV4Factory());
}
if ((discoveryConfig.DiscoveryVersion & DiscoveryVersion.V5) != 0)
{
discoveryApps.Add(discoveryV5Factory());
}
_discoveryApps = [.. discoveryApps];
_compositeNodeSource = new CompositeNodeSource(_discoveryApps);
}
public void InitializeChannel(IChannel channel)
=> ForEachDiscoveryApp(static (discoveryApp, state) => discoveryApp.InitializeChannel(state), channel);
public async Task StartAsync()
{
if (_discoveryApps.Length == 0) return;
Bootstrap bootstrap = new Bootstrap()
.Group(new MultithreadEventLoopGroup(1))
.Option(ChannelOption.Allocator, NethermindBuffers.DiscoveryAllocator)
.Option(ChannelOption.RcvbufAllocator, new FixedRecvByteBufAllocator(2048 * 2))
;
if (_channelFactory is not null)
bootstrap.ChannelFactory(() => _channelFactory!.CreateDatagramChannel());
else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
bootstrap.ChannelFactory(static () => new SocketDatagramChannel(AddressFamily.InterNetwork));
else
bootstrap.Channel<SocketDatagramChannel>();
bootstrap.Handler(new ActionChannelInitializer<IDatagramChannel>(InitializeChannel));
await _connections.BindAsync(bootstrap, _networkConfig.DiscoveryPort);
await WhenAllDiscoveryApps(static discoveryApp => discoveryApp.StartAsync());
}
public async Task StopAsync()
{
try
{
await Task.WhenAll(_connections.StopAsync(), WhenAllDiscoveryApps(static discoveryApp => discoveryApp.StopAsync()));
}
finally
{
_compositeNodeSource.Dispose();
await DisposeDiscoveryApps();
}
}
string IStoppableService.Description => "discovery connection";
public void AddNodeToDiscovery(Node node) => ForEachDiscoveryApp(static (discoveryApp, discoveredNode) => discoveryApp.AddNodeToDiscovery(discoveredNode), node);
private void ForEachDiscoveryApp<TState>(Action<IDiscoveryApp, TState> action, TState state)
{
IDiscoveryApp[] discoveryApps = _discoveryApps;
for (int i = 0; i < discoveryApps.Length; i++)
{
action(discoveryApps[i], state);
}
}
private Task WhenAllDiscoveryApps(Func<IDiscoveryApp, Task> action)
{
IDiscoveryApp[] discoveryApps = _discoveryApps;
if (discoveryApps.Length == 0)
{
return Task.CompletedTask;
}
ArrayPoolListRef<Task> tasks = new(discoveryApps.Length);
for (int i = 0; i < discoveryApps.Length; i++)
{
tasks.Add(action(discoveryApps[i]));
}
Task result = Task.WhenAll(tasks.AsSpan());
tasks.Dispose();
return result;
}
private async Task DisposeDiscoveryApps()
{
IDiscoveryApp[] discoveryApps = _discoveryApps;
for (int i = 0; i < discoveryApps.Length; i++)
{
if (discoveryApps[i] is IAsyncDisposable asyncDisposable)
{
try
{
await asyncDisposable.DisposeAsync();
}
catch (Exception e)
{
if (_logger.IsWarn) _logger.Warn($"Error disposing discovery app {discoveryApps[i]}: {e}");
}
}
}
}
public IAsyncEnumerable<Node> DiscoverNodes(CancellationToken cancellationToken) => _compositeNodeSource.DiscoverNodes(cancellationToken);
public event EventHandler<NodeEventArgs>? NodeRemoved
{
add => _compositeNodeSource.NodeRemoved += value;
remove => _compositeNodeSource.NodeRemoved -= value;
}
}