Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5892cc8
rename DBusWireConnection to LibDBusWireConnection
jmacato Mar 5, 2026
8032895
add DBusSerializedMessage record and IDBusMessageSerializer interface
jmacato Mar 5, 2026
ba2d218
add DBusWireWriter for managed D-Bus wire format serialization
jmacato Mar 5, 2026
9db4eea
add DBusWireReader for managed D-Bus wire format deserialization
jmacato Mar 5, 2026
53e7799
add ManagedDBusMessageSerializer for full message serialize/deserialize
jmacato Mar 5, 2026
07adb33
cross-validate ManagedDBusMessageSerializer against NDesk reference
jmacato Mar 5, 2026
befe827
add ChannelsDBusWireConnection with channel-based IDBusWireConnection
jmacato Mar 5, 2026
d091395
add UnixSocketDBusTransport for socket-based D-Bus wire framing
jmacato Mar 5, 2026
c8ee49c
add P2P end-to-end integration tests for wire connection stack
jmacato Mar 5, 2026
b1e5a55
make _disposed atomic in ChannelsDBusWireConnection
jmacato Mar 5, 2026
191098f
add socket/CTS ownership to ChannelsDBusWireConnection
jmacato Mar 5, 2026
516746d
replace bare catch with typed exception filter in receive loop
jmacato Mar 5, 2026
e7dbb13
SendWithReplyAsync uses WriteAsync + disposes CancellationTokenRegist…
jmacato Mar 5, 2026
f15599c
expose CTS from DBusTransport.FromSocket, thread into ConnectUnix
jmacato Mar 5, 2026
794f681
update test callers to use new FromSocket 5-tuple signature
jmacato Mar 5, 2026
716f3d9
add container type cross-validation tests (arrays, variants, dicts)
jmacato Mar 5, 2026
cddda41
harden up the serdes code
jmacato Mar 6, 2026
3c84272
rename DBusMessageMarshaler and DbusWireWorker to Lib-prefixed names
jmacato Mar 6, 2026
cb958fb
Add new logging mechanism, and harden up serialization. and more tests.
jmacato Mar 6, 2026
180eeb6
update docs
jmacato Mar 6, 2026
d07f31b
fix review comments
jmacato Mar 6, 2026
1445b11
rename DBusWireWorker to LibDBusWireWorker.
jmacato Mar 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Avalonia.DBus/Avalonia.DBus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

<ItemGroup>
<InternalsVisibleTo Include="Avalonia.DBus.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c1bba1142285fe0419326fb25866ba62c47e6c2b5c1ab0c95b46413fad375471232cb81706932e1cef38781b9ebd39d5100401bacb651c6c5bbf59e571e81b3bc08d2a622004e08b1a6ece82a7e0b9857525c86d2b95fab4bc3dce148558d7f3ae61aa3a234086902aeface87d9dfdd32b9d2fe3c6dd4055b5ab4b104998bd87" />
<InternalsVisibleTo Include="Avalonia.DBus.WireMarshalling.Tests, PublicKey=0024000004800000940000000602000000240000525341310004000001000100c1bba1142285fe0419326fb25866ba62c47e6c2b5c1ab0c95b46413fad375471232cb81706932e1cef38781b9ebd39d5100401bacb651c6c5bbf59e571e81b3bc08d2a622004e08b1a6ece82a7e0b9857525c86d2b95fab4bc3dce148558d7f3ae61aa3a234086902aeface87d9dfdd32b9d2fe3c6dd4055b5ab4b104998bd87" />
</ItemGroup>

</Project>
276 changes: 276 additions & 0 deletions src/Avalonia.DBus/ChannelsDBusWireConnection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Avalonia.DBus.Managed;

namespace Avalonia.DBus;

/// <summary>
/// An <see cref="IDBusWireConnection"/> implementation that exchanges
/// <see cref="DBusSerializedMessage"/> values through channels, assigns
/// request serials, and correlates replies on a background receive loop.
/// </summary>
#if !AVDBUS_INTERNAL
public
#endif
sealed class ChannelsDBusWireConnection : IDBusWireConnection
{
private readonly string? _uniqueName;
private readonly bool _isPeerToPeer;
private readonly IDBusMessageSerializer _serializer;
private readonly ChannelWriter<DBusSerializedMessage> _outboundWriter;
private readonly Channel<DBusMessage> _receiving;
private readonly ConcurrentDictionary<uint, TaskCompletionSource<DBusMessage>> _pendingReplies = new();
private readonly CancellationTokenSource _cts = new();
private readonly Task _receiveLoopTask;
private readonly Socket? _socket;
private readonly CancellationTokenSource? _transportCts;
private readonly IDBusDiagnostics? _diagnostics;

private uint _nextSerial;
private int _disposed;

/// <summary>
/// Creates a channel-backed wire connection.
/// </summary>
/// <param name="reader">Produces serialized inbound messages.</param>
/// <param name="writer">Consumes serialized outbound messages.</param>
/// <param name="socket">An optional socket to dispose with the connection.</param>
/// <param name="cts">An optional cancellation source controlling the underlying transport tasks.</param>
/// <param name="uniqueName">
/// The bus-assigned unique name for this connection, or <see langword="null"/> for peer-to-peer transports.
/// </param>
/// <param name="isPeerToPeer"><see langword="true"/> when the connection is not attached to a message bus.</param>
public ChannelsDBusWireConnection(
ChannelReader<DBusSerializedMessage> reader,
ChannelWriter<DBusSerializedMessage> writer,
Socket? socket = null,
CancellationTokenSource? cts = null,
string? uniqueName = null,
bool isPeerToPeer = true)
: this(reader, writer, socket, cts, uniqueName, isPeerToPeer, diagnostics: null, serializer: null)
{
}

/// <summary>
/// Creates a channel-backed wire connection with diagnostics hooks.
/// </summary>
/// <param name="reader">Produces serialized inbound messages.</param>
/// <param name="writer">Consumes serialized outbound messages.</param>
/// <param name="socket">An optional socket to dispose with the connection.</param>
/// <param name="cts">An optional cancellation source controlling the underlying transport tasks.</param>
/// <param name="uniqueName">
/// The bus-assigned unique name for this connection, or <see langword="null"/> for peer-to-peer transports.
/// </param>
/// <param name="isPeerToPeer"><see langword="true"/> when the connection is not attached to a message bus.</param>
/// <param name="diagnostics">Receives transport warnings such as malformed inbound messages.</param>
public ChannelsDBusWireConnection(
ChannelReader<DBusSerializedMessage> reader,
ChannelWriter<DBusSerializedMessage> writer,
Socket? socket,
CancellationTokenSource? cts,
string? uniqueName,
bool isPeerToPeer,
IDBusDiagnostics? diagnostics)
: this(reader, writer, socket, cts, uniqueName, isPeerToPeer, diagnostics, serializer: null)
{
}

internal ChannelsDBusWireConnection(
ChannelReader<DBusSerializedMessage> reader,
ChannelWriter<DBusSerializedMessage> writer,
Socket? socket,
CancellationTokenSource? cts,
string? uniqueName,
bool isPeerToPeer,
IDBusDiagnostics? diagnostics,
IDBusMessageSerializer? serializer)
{
_uniqueName = uniqueName;
_isPeerToPeer = isPeerToPeer;
_socket = socket;
_transportCts = cts;
_outboundWriter = writer;
_diagnostics = diagnostics;
_serializer = serializer ?? new ManagedDBusMessageSerializer();
_receiving = Channel.CreateUnbounded<DBusMessage>(
new UnboundedChannelOptions { SingleReader = true, SingleWriter = false });

_receiveLoopTask = RunReceiveLoopAsync(reader, _cts.Token);
}

/// <summary>
/// Gets a value indicating whether this connection is peer-to-peer and therefore has no message bus.
/// </summary>
public bool IsPeerToPeer => _isPeerToPeer;

/// <summary>
/// Gets the reader for inbound messages that were not matched to a pending reply.
/// </summary>
public ChannelReader<DBusMessage> ReceivingReader => _receiving.Reader;

/// <summary>
/// Returns the bus-assigned unique name supplied at construction time, or <see langword="null"/> for peer-to-peer transports.
/// </summary>
public Task<string?> GetUniqueNameAsync() => Task.FromResult(_uniqueName);

/// <summary>
/// Serializes and sends a message without waiting for a reply.
/// </summary>
public Task SendAsync(DBusMessage message, CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposed) != 0, this);
cancellationToken.ThrowIfCancellationRequested();

if (message.Serial == 0)
message.Serial = GetNextSerial();

message.Sender ??= _uniqueName;

var serialized = _serializer.Serialize(message);
return _outboundWriter.WriteAsync(serialized, cancellationToken).AsTask();
}

/// <summary>
/// Serializes and sends a message, then waits for a reply whose reply serial matches the request serial.
/// </summary>
public async Task<DBusMessage> SendWithReplyAsync(DBusMessage message, CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(Volatile.Read(ref _disposed) != 0, this);
cancellationToken.ThrowIfCancellationRequested();

if (message.Serial == 0)
message.Serial = GetNextSerial();

message.Sender ??= _uniqueName;

var tcs = new TaskCompletionSource<DBusMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
_pendingReplies[message.Serial] = tcs;

// Remove canceled waits from the pending-reply table.
CancellationTokenRegistration reg = default;
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.Register(() =>
{
if (_pendingReplies.TryRemove(message.Serial, out var removed))
removed.TrySetCanceled(cancellationToken);
});

// Dispose the registration when the wait completes to avoid leaking long-lived tokens.
_ = tcs.Task.ContinueWith(_ => reg.Dispose(), TaskContinuationOptions.ExecuteSynchronously);
}

try
{
var serialized = _serializer.Serialize(message);
await _outboundWriter.WriteAsync(serialized, cancellationToken).ConfigureAwait(false);
}
catch
{
_pendingReplies.TryRemove(message.Serial, out _);
reg.Dispose();
throw;
}

return await tcs.Task.ConfigureAwait(false);
}

/// <summary>
/// Stops the receive loop, shuts down the underlying transport, and fails outstanding reply waiters.
/// </summary>
public async ValueTask DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) != 0)
return;

_transportCts?.Cancel();

_cts.Cancel();

try
{
await _receiveLoopTask.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}

_outboundWriter.TryComplete();

_socket?.Dispose();

_transportCts?.Dispose();
_cts.Dispose();

// Fail outstanding reply waiters after transport shutdown.
var disposedEx = new ObjectDisposedException(nameof(ChannelsDBusWireConnection));
foreach (var kvp in _pendingReplies)
{
if (_pendingReplies.TryRemove(kvp.Key, out var tcs))
tcs.TrySetException(disposedEx);
}

_receiving.Writer.TryComplete();
}

private async Task RunReceiveLoopAsync(ChannelReader<DBusSerializedMessage> reader, CancellationToken cancellationToken)
{
try
{
await foreach (var serialized in reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
DBusMessage message;
try
{
message = _serializer.Deserialize(serialized);
}
catch (Exception ex) when (ex is InvalidDataException
or NotSupportedException
or InvalidOperationException
or FormatException)
{
DBusTransportLog.MalformedMessageSkipped(_diagnostics, ex);
continue;
}

if (message.Type is DBusMessageType.MethodReturn or DBusMessageType.Error
&& message.ReplySerial != 0
&& _pendingReplies.TryRemove(message.ReplySerial, out var pendingTcs))
{
pendingTcs.TrySetResult(message);
}
else
{
await _receiving.Writer.WriteAsync(message, cancellationToken).ConfigureAwait(false);
}
}

if (!cancellationToken.IsCancellationRequested && Volatile.Read(ref _disposed) == 0)
DBusTransportLog.InboundTransportCompleted(_diagnostics);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Normal shutdown
}
catch (ChannelClosedException)
{
// The inbound channel was completed — normal shutdown
}
}

private uint GetNextSerial()
{
uint serial;
do
{
serial = Interlocked.Increment(ref _nextSerial);
} while (serial == 0);

return serial;
}
}
16 changes: 8 additions & 8 deletions src/Avalonia.DBus/DBusConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ public DBusConnection(IDBusWireConnection wire, IDBusDiagnostics? diagnostics =
}

/// <summary>
/// Connects to a D-Bus bus at the specified address.
/// Connects to a D-Bus address or one of the built-in <c>session</c>/<c>system</c> aliases.
/// </summary>
public static async Task<DBusConnection> ConnectAsync(
string address,
CancellationToken cancellationToken = default)
{
var wire = await DBusWireConnection.ConnectAsync(address, cancellationToken).ConfigureAwait(false);
var wire = await LibDBusWireConnection.ConnectAsync(address, cancellationToken).ConfigureAwait(false);
return new DBusConnection(wire, diagnostics: null);
}

/// <summary>
/// Connects to a D-Bus bus at the specified address.
/// Connects to a D-Bus address or one of the built-in <c>session</c>/<c>system</c> aliases.
/// </summary>
public static async Task<DBusConnection> ConnectAsync(
string address,
IDBusDiagnostics? diagnostics,
CancellationToken cancellationToken = default)
{
var wire = await DBusWireConnection.ConnectAsync(address, diagnostics, cancellationToken)
var wire = await LibDBusWireConnection.ConnectAsync(address, diagnostics, cancellationToken)
.ConfigureAwait(false);
return new DBusConnection(wire, diagnostics);
}
Expand All @@ -70,7 +70,7 @@ public static async Task<DBusConnection> ConnectAsync(
public static async Task<DBusConnection> ConnectSessionAsync(
CancellationToken cancellationToken = default)
{
var wire = await DBusWireConnection.ConnectSessionAsync(cancellationToken).ConfigureAwait(false);
var wire = await LibDBusWireConnection.ConnectSessionAsync(cancellationToken).ConfigureAwait(false);
return new DBusConnection(wire, diagnostics: null);
}

Expand All @@ -81,7 +81,7 @@ public static async Task<DBusConnection> ConnectSessionAsync(
IDBusDiagnostics? diagnostics,
CancellationToken cancellationToken = default)
{
var wire = await DBusWireConnection.ConnectSessionAsync(diagnostics, cancellationToken)
var wire = await LibDBusWireConnection.ConnectSessionAsync(diagnostics, cancellationToken)
.ConfigureAwait(false);
return new DBusConnection(wire, diagnostics);
}
Expand All @@ -92,7 +92,7 @@ public static async Task<DBusConnection> ConnectSessionAsync(
public static async Task<DBusConnection> ConnectSystemAsync(
CancellationToken cancellationToken = default)
{
var wire = await DBusWireConnection.ConnectSystemAsync(cancellationToken).ConfigureAwait(false);
var wire = await LibDBusWireConnection.ConnectSystemAsync(cancellationToken).ConfigureAwait(false);
return new DBusConnection(wire, diagnostics: null);
}

Expand All @@ -103,7 +103,7 @@ public static async Task<DBusConnection> ConnectSystemAsync(
IDBusDiagnostics? diagnostics,
CancellationToken cancellationToken = default)
{
var wire = await DBusWireConnection.ConnectSystemAsync(diagnostics, cancellationToken)
var wire = await LibDBusWireConnection.ConnectSystemAsync(diagnostics, cancellationToken)
.ConfigureAwait(false);
return new DBusConnection(wire, diagnostics);
}
Expand Down
12 changes: 12 additions & 0 deletions src/Avalonia.DBus/DBusSerializedMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Avalonia.DBus;

/// <summary>
/// Represents a D-Bus message in wire format together with any ancillary Unix file descriptors
/// that were transported outside the byte payload.
/// </summary>
/// <param name="Message">The serialized D-Bus message bytes, including the header and body.</param>
/// <param name="Fds">Ancillary Unix file descriptors referenced by the serialized payload.</param>
#if !AVDBUS_INTERNAL
public
#endif
record DBusSerializedMessage(byte[] Message, int[] Fds);
27 changes: 27 additions & 0 deletions src/Avalonia.DBus/DBusTransportLog.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;

namespace Avalonia.DBus;

internal static class DBusTransportLog
{
public static void UnsupportedUnixFdTransport(IDBusDiagnostics? diagnostics, int fdCount)
=> diagnostics?.Log(
DBusLogLevel.Warning,
$"Ignoring {fdCount} D-Bus Unix file descriptor(s) on the managed socket transport " +
"because .NET sockets do not currently expose sendmsg/recvmsg SCM_RIGHTS APIs.");

public static void MalformedMessageSkipped(IDBusDiagnostics? diagnostics, Exception exception)
=> diagnostics?.Log(
DBusLogLevel.Warning,
$"Skipping malformed D-Bus message: {exception.Message}");

public static void SocketTransportStopped(IDBusDiagnostics? diagnostics, string direction, Exception exception)
=> diagnostics?.Log(
DBusLogLevel.Warning,
$"D-Bus socket transport {direction} stopped: {exception.GetType().Name}: {exception.Message}");

public static void InboundTransportCompleted(IDBusDiagnostics? diagnostics)
=> diagnostics?.Log(
DBusLogLevel.Warning,
"Inbound D-Bus transport completed; the connection will stop receiving messages until it is disposed.");
}
Loading