Skip to content

Commit 1938551

Browse files
authored
Merge pull request #220 from pre-martin/feature/heartbeat
Heartbeat protocol to detect faulty connections.
2 parents 1818969 + 3b6081f commit 1938551

1 file changed

Lines changed: 73 additions & 10 deletions

File tree

StreamDeckSimHub.Plugin/SimHub/SimHubConnection.cs

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (C) 2025 Martin Renner
1+
// Copyright (C) 2026 Martin Renner
22
// LGPL-3.0-or-later (see file COPYING and COPYING.LESSER)
33

44
using System.Diagnostics;
@@ -43,18 +43,11 @@ public interface IPropertyChangedReceiver
4343
/// <summary>
4444
/// Helper class which implements a <c>IPropertyChangedReceiver</c> and delegates the event to a function.
4545
/// </summary>
46-
public class PropertyChangedDelegate : IPropertyChangedReceiver
46+
public class PropertyChangedDelegate(Func<PropertyChangedArgs, Task> action) : IPropertyChangedReceiver
4747
{
48-
private readonly Func<PropertyChangedArgs, Task> _action;
49-
50-
public PropertyChangedDelegate(Func<PropertyChangedArgs, Task> action)
51-
{
52-
_action = action;
53-
}
54-
5548
public async Task PropertyChanged(PropertyChangedArgs args)
5649
{
57-
await _action(args);
50+
await action(args);
5851
}
5952
}
6053

@@ -89,6 +82,11 @@ public class SimHubConnection(IOptions<ConnectionSettings> connectionSettings, P
8982
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
9083
private TcpClient? _tcpClient;
9184
private long _connected;
85+
private bool _heartbeatEnabled;
86+
private long _lastReceivedTicks;
87+
88+
/// <summary>Heartbeat timeout: reconnect if no message was received for this duration.</summary>
89+
private static readonly TimeSpan HeartbeatTimeout = TimeSpan.FromSeconds(75);
9290

9391
private readonly SemaphoreSlim _semaphore = new(1);
9492

@@ -136,6 +134,8 @@ await _tcpClient.ConnectAsync(_connectionSettings.Host, _connectionSettings.Port
136134
{
137135
Logger.Info($"Established connection to {Sanitize(line)}");
138136
Connected = true;
137+
_heartbeatEnabled = IsHeartbeatSupported(line);
138+
Logger.Info($"Heartbeat monitoring: {(_heartbeatEnabled ? "enabled" : "disabled")}");
139139
}
140140
}
141141
catch (Exception e)
@@ -365,13 +365,30 @@ private async Task ParseProperty(string line)
365365
private async Task ReadFromServer()
366366
{
367367
Debug.Assert(_tcpClient != null, nameof(_tcpClient) + " != null");
368+
369+
using var watchdogCts = new CancellationTokenSource();
370+
Task? watchdogTask = null;
371+
if (_heartbeatEnabled)
372+
{
373+
Interlocked.Exchange(ref _lastReceivedTicks, DateTime.UtcNow.Ticks);
374+
var watchdogToken = watchdogCts.Token;
375+
watchdogTask = Task.Run(() => WatchdogAsync(watchdogToken), watchdogToken);
376+
}
377+
368378
try
369379
{
370380
var reader = new LineReader(_tcpClient.GetStream());
371381
string? line;
372382
while ((line = await reader.ReadLineAsync()) != null)
373383
{
384+
Interlocked.Exchange(ref _lastReceivedTicks, DateTime.UtcNow.Ticks);
374385
Logger.Debug($"Received from server: {Sanitize(line)}");
386+
if (line == "ping")
387+
{
388+
Logger.Trace("Received ping from server");
389+
continue;
390+
}
391+
375392
if (line.StartsWith("Property "))
376393
{
377394
try
@@ -394,10 +411,56 @@ private async Task ReadFromServer()
394411
// IOException: Fall through to "CloseAndReconnect".
395412
Logger.Warn($"Received IOException while waiting for data: {ioe}");
396413
}
414+
finally
415+
{
416+
await watchdogCts.CancelAsync();
417+
if (watchdogTask != null)
418+
{
419+
try { await watchdogTask; } catch (OperationCanceledException) { }
420+
}
421+
}
397422

398423
await CloseAndReconnect();
399424
}
400425

426+
private async Task WatchdogAsync(CancellationToken token)
427+
{
428+
// Check every 10 seconds whether the last received message is too old.
429+
while (!token.IsCancellationRequested)
430+
{
431+
try
432+
{
433+
await Task.Delay(TimeSpan.FromSeconds(10), token);
434+
}
435+
catch (OperationCanceledException)
436+
{
437+
return;
438+
}
439+
440+
var lastReceived = new DateTime(Interlocked.Read(ref _lastReceivedTicks), DateTimeKind.Utc);
441+
if (DateTime.UtcNow - lastReceived > HeartbeatTimeout)
442+
{
443+
Logger.Warn($"Heartbeat timeout: no message received for more than {HeartbeatTimeout.TotalSeconds}s. Closing connection.");
444+
_tcpClient?.Close();
445+
return;
446+
}
447+
}
448+
}
449+
450+
/// <summary>
451+
/// Returns <c>true</c> if the server version reported in the connect string is at least v1.6.0,
452+
/// which is the version that introduced the heartbeat ping mechanism.
453+
/// </summary>
454+
private static bool IsHeartbeatSupported(string connectString)
455+
{
456+
// Connect string format: "SimHub Property Server v1.6.0"
457+
var idx = connectString.LastIndexOf('v');
458+
if (idx < 0) return false;
459+
var versionStr = connectString.Substring(idx + 1);
460+
if (!Version.TryParse(versionStr, out var version)) return false;
461+
return version >= new Version(1, 6, 0);
462+
}
463+
401464
private async Task WriteToServer(string line)
402465
{
403466
Logger.Debug($"WriteToServer: {line}");

0 commit comments

Comments
 (0)