Skip to content

Commit 9850142

Browse files
authored
Fix behavior when connection drops (#56)
* add a method to check if the TCP connection is open; dispose of the TransportService correctly in case of disconnections; Signed-off-by: Níckolas Goline <nickolas.goline+github@gmail.com> * fix tests Signed-off-by: Níckolas Goline <nickolas.goline+github@gmail.com> --------- Signed-off-by: Níckolas Goline <nickolas.goline+github@gmail.com>
1 parent d7e4eec commit 9850142

File tree

11 files changed

+299
-107
lines changed

11 files changed

+299
-107
lines changed

src/NLightning.Application/Node/Managers/PeerManager.cs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,14 @@ public async Task StopAsync()
8989
throw new InvalidOperationException($"{nameof(PeerManager)} is not running");
9090

9191
foreach (var peerKey in _peers.Keys)
92-
DisconnectPeer(peerKey);
92+
try
93+
{
94+
DisconnectPeer(peerKey);
95+
}
96+
catch (Exception e)
97+
{
98+
_logger.LogWarning(e, "Error disconnecting peer {Peer}", peerKey);
99+
}
93100

94101
try
95102
{
@@ -174,10 +181,11 @@ private void HandleNewPeerConnected(object? _, NewPeerConnectedEventArgs args)
174181
{
175182
// Create the peer
176183
var peerService = _peerServiceFactory.CreateConnectingPeerAsync(args.TcpClient).GetAwaiter().GetResult();
177-
178184
peerService.OnDisconnect += HandlePeerDisconnection;
179185
peerService.OnChannelMessageReceived += HandlePeerChannelMessage;
180186

187+
_logger.LogTrace("PeerService created for peer {PeerPubKey}", peerService.PeerPubKey);
188+
181189
var peer = new PeerModel(peerService.PeerPubKey, args.Host, args.Port)
182190
{
183191
LastSeenAt = DateTime.UtcNow
@@ -192,12 +200,23 @@ private void HandleNewPeerConnected(object? _, NewPeerConnectedEventArgs args)
192200
}
193201
}
194202

195-
private void HandlePeerDisconnection(object? _, PeerDisconnectedEventArgs args)
203+
private void HandlePeerDisconnection(object? sender, PeerDisconnectedEventArgs args)
196204
{
197205
ArgumentNullException.ThrowIfNull(args);
198206

199207
_peers.Remove(args.PeerPubKey);
200208
_logger.LogInformation("Peer {Peer} disconnected", args.PeerPubKey);
209+
210+
if (sender is IPeerService peerService)
211+
{
212+
peerService.OnDisconnect -= HandlePeerDisconnection;
213+
peerService.OnChannelMessageReceived -= HandlePeerChannelMessage;
214+
}
215+
else
216+
{
217+
_logger.LogWarning("Peer {Peer} disconnected, but we were unable to detach event handlers",
218+
args.PeerPubKey);
219+
}
201220
}
202221

203222
private void HandlePeerChannelMessage(object? _, ChannelMessageEventArgs args)

src/NLightning.Domain/Protocol/Interfaces/IMessageService.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ public interface IMessageService : IDisposable
1212
/// Sends a message.
1313
/// </summary>
1414
/// <param name="message">The message to send.</param>
15+
/// <param name="throwOnException">If true, exceptions will be thrown instead of being raised via the OnExceptionRaised event.</param>
1516
/// <param name="cancellationToken">The cancellation token.</param>
1617
/// <returns>A task that represents the asynchronous operation.</returns>
17-
Task SendMessageAsync(IMessage message, CancellationToken cancellationToken = default);
18+
Task SendMessageAsync(IMessage message, bool throwOnException = false,
19+
CancellationToken cancellationToken = default);
1820

1921
/// <summary>
2022
/// Event that is raised when a message is received.

src/NLightning.Infrastructure/Node/Factories/PeerServiceFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public async Task<IPeerService> CreateConnectedPeerAsync(CompactPubKey peerPubKe
5959
}
6060
catch (Exception ex)
6161
{
62+
transportService.Dispose();
6263
throw new ConnectionException($"Error connecting to peer {peerPubKey}", ex);
6364
}
6465

@@ -100,6 +101,7 @@ public async Task<IPeerService> CreateConnectingPeerAsync(TcpClient tcpClient)
100101
}
101102
catch (Exception ex)
102103
{
104+
transportService.Dispose();
103105
throw new ConnectionException($"Error establishing connection to peer {ipAddress}:{port}", ex);
104106
}
105107

src/NLightning.Infrastructure/Node/Services/PeerCommunicationService.cs

Lines changed: 65 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ namespace NLightning.Infrastructure.Node.Services;
1818
/// </summary>
1919
public class PeerCommunicationService : IPeerCommunicationService
2020
{
21-
private readonly CancellationTokenSource _cancellationTokenSource = new();
21+
private readonly CancellationTokenSource _cts = new();
2222
private readonly ILogger<PeerCommunicationService> _logger;
2323
private readonly IMessageService _messageService;
2424
private readonly IPingPongService _pingPongService;
2525
private readonly IServiceProvider _serviceProvider;
2626
private readonly IMessageFactory _messageFactory;
27+
private readonly TaskCompletionSource<bool> _pingPongTcs = new();
28+
2729
private bool _isInitialized;
30+
private CancellationTokenSource? _initWaitCancellationTokenSource;
2831

2932
/// <inheritdoc />
3033
public event EventHandler<IMessage?>? MessageReceived;
@@ -69,16 +72,11 @@ public PeerCommunicationService(ILogger<PeerCommunicationService> logger, IMessa
6972
/// <inheritdoc />
7073
public async Task InitializeAsync(TimeSpan networkTimeout)
7174
{
72-
// Always send an init message upon connection
73-
_logger.LogTrace("Sending init message to peer {peer}", PeerCompactPubKey);
74-
var initMessage = _messageFactory.CreateInitMessage();
75-
await _messageService.SendMessageAsync(initMessage, _cancellationTokenSource.Token);
76-
77-
// Wait for an init message
7875
_logger.LogTrace("Waiting for init message from peer {peer}", PeerCompactPubKey);
7976

8077
// Set timeout to close connection if the other peer doesn't send an init message
81-
_ = Task.Delay(networkTimeout, _cancellationTokenSource.Token).ContinueWith(task =>
78+
_initWaitCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(_cts.Token);
79+
_ = Task.Delay(networkTimeout, _initWaitCancellationTokenSource.Token).ContinueWith(task =>
8280
{
8381
if (!task.IsCanceled && !_isInitialized)
8482
{
@@ -87,21 +85,35 @@ public async Task InitializeAsync(TimeSpan networkTimeout)
8785
}
8886
});
8987

90-
if (!_messageService.IsConnected)
88+
// Always send an init message upon connection
89+
_logger.LogTrace("Sending init message to peer {peer}", PeerCompactPubKey);
90+
var initMessage = _messageFactory.CreateInitMessage();
91+
try
9192
{
92-
throw new ConnectionException($"Failed to connect to peer {PeerCompactPubKey}");
93+
await _messageService.SendMessageAsync(initMessage, true, _cts.Token);
94+
}
95+
catch (Exception e)
96+
{
97+
_pingPongTcs.TrySetResult(true);
98+
throw new ConnectionException($"Failed to send init message to peer {PeerCompactPubKey}", e);
9399
}
94100

95101
// Set up ping service to keep connection alive
96-
SetupPingPongService();
102+
if (!_cts.IsCancellationRequested)
103+
{
104+
if (!_messageService.IsConnected)
105+
throw new ConnectionException($"Failed to connect to peer {PeerCompactPubKey}");
106+
107+
SetupPingPongService();
108+
}
97109
}
98110

99111
/// <inheritdoc />
100112
public async Task SendMessageAsync(IMessage message, CancellationToken cancellationToken = default)
101113
{
102114
try
103115
{
104-
await _messageService.SendMessageAsync(message, cancellationToken);
116+
await _messageService.SendMessageAsync(message, cancellationToken: cancellationToken);
105117
}
106118
catch (Exception ex)
107119
{
@@ -112,11 +124,18 @@ public async Task SendMessageAsync(IMessage message, CancellationToken cancellat
112124
/// <inheritdoc />
113125
public void Disconnect()
114126
{
115-
_logger.LogInformation("Disconnecting peer {peer}", PeerCompactPubKey);
116-
_cancellationTokenSource.Cancel();
117-
_messageService.Dispose();
118-
119-
DisconnectEvent?.Invoke(this, EventArgs.Empty);
127+
try
128+
{
129+
_ = _cts.CancelAsync();
130+
_logger.LogTrace("Waiting for ping service to stop for peer {peer}", PeerCompactPubKey);
131+
_pingPongTcs.Task.Wait(TimeSpan.FromSeconds(5));
132+
_logger.LogTrace("Ping service stopped for peer {peer}", PeerCompactPubKey);
133+
}
134+
finally
135+
{
136+
_messageService.Dispose();
137+
DisconnectEvent?.Invoke(this, EventArgs.Empty);
138+
}
120139
}
121140

122141
private void SetupPingPongService()
@@ -125,13 +144,10 @@ private void SetupPingPongService()
125144
_pingPongService.OnPongReceived += HandlePongReceived;
126145

127146
// Setup Ping to keep connection alive
128-
_ = _pingPongService.StartPingAsync(_cancellationTokenSource.Token).ContinueWith(task =>
147+
_ = _pingPongService.StartPingAsync(_cts.Token).ContinueWith(_ =>
129148
{
130-
if (task.IsFaulted)
131-
{
132-
RaiseException(new ConnectionException($"Failed to start ping service for peer {PeerCompactPubKey}",
133-
task.Exception));
134-
}
149+
_logger.LogTrace("Ping service stopped for peer {peer}, setting result", PeerCompactPubKey);
150+
_pingPongTcs.TrySetResult(true);
135151
});
136152

137153
_logger.LogInformation("Ping service started for peer {peer}", PeerCompactPubKey);
@@ -154,7 +170,7 @@ private void HandlePingMessageReady(object? sender, IMessage pingMessage)
154170

155171
try
156172
{
157-
_messageService.SendMessageAsync(pingMessage, _cancellationTokenSource.Token).GetAwaiter().GetResult();
173+
_messageService.SendMessageAsync(pingMessage, cancellationToken: _cts.Token).GetAwaiter().GetResult();
158174
}
159175
catch (Exception ex)
160176
{
@@ -172,6 +188,7 @@ private void HandleMessageReceived(object? sender, IMessage? message)
172188
if (!_isInitialized && message.Type == MessageTypes.Init)
173189
{
174190
_isInitialized = true;
191+
_initWaitCancellationTokenSource?.Cancel();
175192
}
176193

177194
// Forward the message to subscribers
@@ -206,6 +223,7 @@ private void RaiseException(Exception exception)
206223
{
207224
ChannelId? channelId = null;
208225
var message = errorException.Message;
226+
mustDisconnect = true;
209227

210228
if (errorException is ChannelErrorException channelErrorException)
211229
{
@@ -214,8 +232,16 @@ private void RaiseException(Exception exception)
214232
message = channelErrorException.PeerMessage;
215233
}
216234

217-
_messageService.SendMessageAsync(new ErrorMessage(new ErrorPayload(channelId, message)));
218-
mustDisconnect = true;
235+
if (errorException is not ConnectionException)
236+
{
237+
_logger.LogTrace("Sending error message to peer {peer}. ChannelId: {channelId}, Message: {message}",
238+
PeerCompactPubKey, channelId, message);
239+
240+
_ = Task.Run(() => _messageService.SendMessageAsync(
241+
new ErrorMessage(new ErrorPayload(channelId, message))));
242+
243+
return;
244+
}
219245
}
220246
else if (exception is WarningException warningException)
221247
{
@@ -229,17 +255,25 @@ private void RaiseException(Exception exception)
229255
message = channelWarningException.PeerMessage;
230256
}
231257

232-
_messageService.SendMessageAsync(new WarningMessage(new ErrorPayload(channelId, message)));
233-
}
258+
_logger.LogTrace("Sending warning message to peer {peer}. ChannelId: {channelId}, Message: {message}",
259+
PeerCompactPubKey, channelId, message);
234260

235-
_logger.LogError(exception, "Exception occurred with peer {peer}. {exceptionMessage}", PeerCompactPubKey,
236-
exception.Message);
261+
_ = Task.Run(() => _messageService.SendMessageAsync(
262+
new WarningMessage(new ErrorPayload(channelId, message))));
263+
}
237264

238265
// Forward the exception to subscribers
239266
ExceptionRaised?.Invoke(this, exception);
240267

241268
// Disconnect if not already disconnecting
242-
if (mustDisconnect && !_cancellationTokenSource.IsCancellationRequested)
269+
if (mustDisconnect && !_cts.IsCancellationRequested)
270+
{
271+
_messageService.OnMessageReceived -= HandleMessageReceived;
272+
_messageService.OnExceptionRaised -= HandleExceptionRaised;
273+
_pingPongService.DisconnectEvent -= HandleExceptionRaised;
274+
275+
_logger.LogWarning(exception, "We're disconnecting peer {peer} because of an exception", PeerCompactPubKey);
243276
Disconnect();
277+
}
244278
}
245279
}

src/NLightning.Infrastructure/Node/Services/PeerService.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
namespace NLightning.Infrastructure.Node.Services;
55

66
using Domain.Crypto.ValueObjects;
7+
using Domain.Exceptions;
78
using Domain.Node.Events;
89
using Domain.Node.Interfaces;
910
using Domain.Node.Options;
@@ -56,7 +57,18 @@ public PeerService(IPeerCommunicationService peerCommunicationService, FeatureOp
5657
_peerCommunicationService.DisconnectEvent += HandleDisconnection;
5758

5859
// Initialize communication
59-
_peerCommunicationService.InitializeAsync(networkTimeout).GetAwaiter().GetResult();
60+
try
61+
{
62+
_peerCommunicationService.InitializeAsync(networkTimeout).GetAwaiter().GetResult();
63+
}
64+
catch (Exception e)
65+
{
66+
_peerCommunicationService.MessageReceived -= HandleMessage;
67+
_peerCommunicationService.ExceptionRaised -= HandleException;
68+
_peerCommunicationService.DisconnectEvent -= HandleDisconnection;
69+
70+
throw new ErrorException("Error initializing peer communication", e);
71+
}
6072
}
6173

6274
/// <summary>
@@ -83,7 +95,6 @@ private void HandleMessage(object? sender, IMessage? message)
8395

8496
if (!_isInitialized)
8597
{
86-
_logger.LogTrace("Received message from peer {peer} but was not initialized", PeerPubKey);
8798
HandleInitialization(message);
8899
}
89100
else if (message is IChannelMessage channelMessage)
@@ -103,6 +114,7 @@ private void HandleException(object? sender, Exception e)
103114

104115
private void HandleDisconnection(object? sender, EventArgs e)
105116
{
117+
_logger.LogTrace("Handling disconnection for peer {Peer}", PeerPubKey);
106118
OnDisconnect?.Invoke(this, new PeerDisconnectedEventArgs(PeerPubKey));
107119
}
108120

0 commit comments

Comments
 (0)