Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 73 additions & 10 deletions StreamDeckSimHub.Plugin/SimHub/SimHubConnection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2025 Martin Renner
// Copyright (C) 2026 Martin Renner
// LGPL-3.0-or-later (see file COPYING and COPYING.LESSER)

using System.Diagnostics;
Expand Down Expand Up @@ -43,18 +43,11 @@ public interface IPropertyChangedReceiver
/// <summary>
/// Helper class which implements a <c>IPropertyChangedReceiver</c> and delegates the event to a function.
/// </summary>
public class PropertyChangedDelegate : IPropertyChangedReceiver
public class PropertyChangedDelegate(Func<PropertyChangedArgs, Task> action) : IPropertyChangedReceiver
{
private readonly Func<PropertyChangedArgs, Task> _action;

public PropertyChangedDelegate(Func<PropertyChangedArgs, Task> action)
{
_action = action;
}

public async Task PropertyChanged(PropertyChangedArgs args)
{
await _action(args);
await action(args);
}
}

Expand Down Expand Up @@ -89,6 +82,11 @@ public class SimHubConnection(IOptions<ConnectionSettings> connectionSettings, P
private static readonly Logger Logger = LogManager.GetCurrentClassLogger();
private TcpClient? _tcpClient;
private long _connected;
private bool _heartbeatEnabled;
private long _lastReceivedTicks;

/// <summary>Heartbeat timeout: reconnect if no message was received for this duration.</summary>
private static readonly TimeSpan HeartbeatTimeout = TimeSpan.FromSeconds(75);

private readonly SemaphoreSlim _semaphore = new(1);

Expand Down Expand Up @@ -136,6 +134,8 @@ await _tcpClient.ConnectAsync(_connectionSettings.Host, _connectionSettings.Port
{
Logger.Info($"Established connection to {Sanitize(line)}");
Connected = true;
_heartbeatEnabled = IsHeartbeatSupported(line);
Logger.Info($"Heartbeat monitoring: {(_heartbeatEnabled ? "enabled" : "disabled")}");
}
}
catch (Exception e)
Expand Down Expand Up @@ -365,13 +365,30 @@ private async Task ParseProperty(string line)
private async Task ReadFromServer()
{
Debug.Assert(_tcpClient != null, nameof(_tcpClient) + " != null");

using var watchdogCts = new CancellationTokenSource();
Task? watchdogTask = null;
if (_heartbeatEnabled)
{
Interlocked.Exchange(ref _lastReceivedTicks, DateTime.UtcNow.Ticks);
var watchdogToken = watchdogCts.Token;
watchdogTask = Task.Run(() => WatchdogAsync(watchdogToken), watchdogToken);
}

try
{
var reader = new LineReader(_tcpClient.GetStream());
string? line;
while ((line = await reader.ReadLineAsync()) != null)
{
Interlocked.Exchange(ref _lastReceivedTicks, DateTime.UtcNow.Ticks);
Logger.Debug($"Received from server: {Sanitize(line)}");
if (line == "ping")
{
Logger.Trace("Received ping from server");
continue;
}

if (line.StartsWith("Property "))
{
try
Expand All @@ -394,10 +411,56 @@ private async Task ReadFromServer()
// IOException: Fall through to "CloseAndReconnect".
Logger.Warn($"Received IOException while waiting for data: {ioe}");
}
finally
{
await watchdogCts.CancelAsync();
if (watchdogTask != null)
{
try { await watchdogTask; } catch (OperationCanceledException) { }
}
}

await CloseAndReconnect();
}

private async Task WatchdogAsync(CancellationToken token)
{
// Check every 10 seconds whether the last received message is too old.
while (!token.IsCancellationRequested)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(10), token);
}
catch (OperationCanceledException)
{
return;
}

var lastReceived = new DateTime(Interlocked.Read(ref _lastReceivedTicks), DateTimeKind.Utc);
if (DateTime.UtcNow - lastReceived > HeartbeatTimeout)
{
Logger.Warn($"Heartbeat timeout: no message received for more than {HeartbeatTimeout.TotalSeconds}s. Closing connection.");
_tcpClient?.Close();
return;
}
}
}

/// <summary>
/// Returns <c>true</c> if the server version reported in the connect string is at least v1.6.0,
/// which is the version that introduced the heartbeat ping mechanism.
/// </summary>
private static bool IsHeartbeatSupported(string connectString)
{
// Connect string format: "SimHub Property Server v1.6.0"
var idx = connectString.LastIndexOf('v');
if (idx < 0) return false;
var versionStr = connectString.Substring(idx + 1);
if (!Version.TryParse(versionStr, out var version)) return false;
return version >= new Version(1, 6, 0);
}

private async Task WriteToServer(string line)
{
Logger.Debug($"WriteToServer: {line}");
Expand Down
Loading