Skip to content

Commit 37de317

Browse files
feat(akka): capture Akka.NET DeadLetters into the Bowire stream
BowireAkkaExtension now subscribes a tiny internal listener actor to the actor system's EventStream for DeadLetter notifications and republishes them through the same broadcast channel as mailbox taps. A new optional IsDeadLetter flag on TappedMessage (default false, backwards-compatible) lets the Bowire UI distinguish undeliverable messages without extra plumbing. The subscription is torn down via RegisterOnTermination when the system shuts down. Adds BowireDeadLetterTests covering the EventStream path: stop an actor, Tell to its now-dead ref, assert the subscriber sees a TappedMessage with IsDeadLetter=true and the recipient set to the system's dead-letters path. Bumps version to 0.10.0 and moves DeadLetters out of the roadmap into "What it does".
1 parent bdcefe0 commit 37de317

5 files changed

Lines changed: 186 additions & 10 deletions

File tree

Directory.Build.props

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@
3838
</PropertyGroup>
3939

4040
<PropertyGroup Condition="'$(IsPackable)' != 'false'">
41-
<Version>0.9.4</Version>
42-
<AssemblyVersion>0.9.4</AssemblyVersion>
43-
<FileVersion>0.9.4</FileVersion>
41+
<Version>0.10.0</Version>
42+
<AssemblyVersion>0.10.0</AssemblyVersion>
43+
<FileVersion>0.10.0</FileVersion>
4444
</PropertyGroup>
4545

4646
<PropertyGroup Condition="$(MSBuildProjectName.StartsWith('Kuestenlogik.Bowire.Protocol.'))">

README.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ Bowire protocol plugin for **[Akka.NET](https://getakka.net/)** actor systems. S
99

1010
- **Mailbox tap** — a custom Akka.NET `MailboxType` (`BowireTapMailbox`) wraps the standard unbounded queue and forwards every enqueue to a per-actor-system extension. Opt-in globally (default mailbox swap) or per-actor (`Props.WithMailbox(...)`).
1111
- **`IExtension` integration**`BowireAkkaExtension` owns the broadcast channel and the active subscriber list. Steady-state cost when nobody's watching: one volatile read per message.
12-
- **Bowire streaming pane**`BowireAkkaProtocol` exposes one server-streaming method, `Tap/MonitorMessages`, that yields `TappedMessage` envelopes (recipient path, sender path, CLR type, payload, timestamp) as JSON.
12+
- **DeadLetters capture** — the extension subscribes to the actor system's `EventStream` and republishes every `Akka.Event.DeadLetter` through the same channel with `TappedMessage.IsDeadLetter = true`, so undeliverable messages surface in the Bowire stream without any per-actor opt-in.
13+
- **Bowire streaming pane**`BowireAkkaProtocol` exposes one server-streaming method, `Tap/MonitorMessages`, that yields `TappedMessage` envelopes (recipient path, sender path, CLR type, payload, timestamp, dead-letter flag) as JSON.
1314

1415
## Install
1516

@@ -66,9 +67,10 @@ dotnet run --project samples/Kuestenlogik.Bowire.Protocol.Akka.Sample
6667

6768
## Roadmap
6869

69-
- **0.1.0** (current) — embedded mode, `EventStream`-style mailbox tap, JSON envelope of recipient/sender/type/payload/timestamp.
70-
- **0.2.0** — external `Akka.Cluster.Tools.ClusterClient` transport so the standalone `bowire` CLI can attach to a running cluster, mailbox-snapshot inspection (size, head messages), `DeadLetters` capture, per-actor throughput stats.
71-
- **0.3.0** — typed payload via Akka serializer roundtrip, opt-in filter API from the Bowire UI (per actor path, per message type), Tell-from-Bowire (interactive duplex).
70+
- **0.1.0** — embedded mode, `EventStream`-style mailbox tap, JSON envelope of recipient/sender/type/payload/timestamp.
71+
- **0.2.0** (current) — `DeadLetters` capture via `EventStream` subscription with `IsDeadLetter` flag on the envelope.
72+
- **0.3.0** — external `Akka.Cluster.Tools.ClusterClient` transport so the standalone `bowire` CLI can attach to a running cluster, mailbox-snapshot inspection (size, head messages), per-actor throughput stats.
73+
- **0.4.0** — typed payload via Akka serializer roundtrip, opt-in filter API from the Bowire UI (per actor path, per message type), Tell-from-Bowire (interactive duplex).
7274

7375
## License
7476

src/Kuestenlogik.Bowire.Protocol.Akka/BowireAkkaExtension.cs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System.Threading.Channels;
55
using Akka.Actor;
6+
using Akka.Event;
67

78
namespace Kuestenlogik.Bowire.Protocol.Akka;
89

@@ -17,6 +18,13 @@ namespace Kuestenlogik.Bowire.Protocol.Akka;
1718
/// <see cref="ChannelReader{T}"/> that receives every tap from now on.
1819
/// Multiple subscribers each get their own reader — no fan-out coupling.
1920
/// </para>
21+
/// <para>
22+
/// The extension also subscribes to the actor system's
23+
/// <see cref="EventStream"/> for <see cref="DeadLetter"/> events and
24+
/// republishes them through the same channel with
25+
/// <see cref="TappedMessage.IsDeadLetter"/> set, so undeliverable messages
26+
/// surface in the Bowire stream without any per-actor opt-in.
27+
/// </para>
2028
/// </summary>
2129
/// <remarks>
2230
/// The tap mailbox queries this extension on every enqueue; if no
@@ -28,13 +36,40 @@ public sealed class BowireAkkaExtension : IExtension
2836
{
2937
private readonly object _lock = new();
3038
private readonly List<Channel<TappedMessage>> _subscribers = [];
39+
private readonly IActorRef _deadLetterListener;
40+
private readonly string _deadLetterPath;
3141

3242
/// <summary>The actor system this extension instance belongs to.</summary>
3343
public ExtendedActorSystem System { get; }
3444

3545
internal BowireAkkaExtension(ExtendedActorSystem system)
3646
{
3747
System = system;
48+
_deadLetterPath = system.DeadLetters.Path.ToString();
49+
50+
// Subscribe a lightweight internal actor to the EventStream for
51+
// DeadLetter notifications. We use an actor (rather than a raw
52+
// delegate) because Akka.NET's EventStream API is actor-based —
53+
// ActorOf gives us automatic lifecycle handling.
54+
_deadLetterListener = system.SystemActorOf(
55+
Props.Create(() => new DeadLetterListener(this)),
56+
"bowire-deadletter-listener");
57+
system.EventStream.Subscribe(_deadLetterListener, typeof(DeadLetter));
58+
59+
// Tear down the subscription when the system shuts down so we
60+
// don't leak the EventStream registration. IExtension has no
61+
// dispose hook, so RegisterOnTermination is the standard knob.
62+
system.RegisterOnTermination(() =>
63+
{
64+
try
65+
{
66+
System.EventStream.Unsubscribe(_deadLetterListener, typeof(DeadLetter));
67+
}
68+
catch
69+
{
70+
// Best-effort cleanup during shutdown; swallow.
71+
}
72+
});
3873
}
3974

4075
/// <summary>
@@ -105,6 +140,60 @@ internal void Publish(TappedMessage msg)
105140
ch.Writer.TryWrite(msg);
106141
}
107142
}
143+
144+
/// <summary>
145+
/// Convert a <see cref="DeadLetter"/> from the actor system's
146+
/// <see cref="EventStream"/> into a <see cref="TappedMessage"/> with
147+
/// <see cref="TappedMessage.IsDeadLetter"/> set, then fan-out via the
148+
/// regular <see cref="Publish"/> path. Wrapped in try/catch because a
149+
/// broken <see cref="object.ToString"/> on a payload must never break
150+
/// the listener actor.
151+
/// </summary>
152+
internal void PublishDeadLetter(DeadLetter deadLetter)
153+
{
154+
try
155+
{
156+
var msg = deadLetter.Message;
157+
Publish(new TappedMessage(
158+
Recipient: _deadLetterPath,
159+
Sender: deadLetter.Sender?.Path?.ToString() ?? string.Empty,
160+
MessageType: msg?.GetType().FullName ?? "<null>",
161+
Payload: msg?.ToString() ?? string.Empty,
162+
Timestamp: DateTime.UtcNow,
163+
IsDeadLetter: true));
164+
}
165+
catch
166+
{
167+
// Diagnostics must never crash the actor system. Swallow.
168+
}
169+
}
170+
171+
#pragma warning disable CA1812 // Instantiated by Akka via Props.Create
172+
/// <summary>
173+
/// Tiny internal actor that bridges the <see cref="EventStream"/>'s
174+
/// actor-based subscription API back into the
175+
/// <see cref="BowireAkkaExtension"/>'s plain method call. Holding a
176+
/// reference to the extension is fine: the extension's lifetime is the
177+
/// actor system's.
178+
/// </summary>
179+
private sealed class DeadLetterListener : UntypedActor
180+
{
181+
private readonly BowireAkkaExtension _extension;
182+
183+
public DeadLetterListener(BowireAkkaExtension extension)
184+
{
185+
_extension = extension;
186+
}
187+
188+
protected override void OnReceive(object message)
189+
{
190+
if (message is DeadLetter dl)
191+
{
192+
_extension.PublishDeadLetter(dl);
193+
}
194+
}
195+
}
196+
#pragma warning restore CA1812
108197
}
109198

110199
/// <summary>

src/Kuestenlogik.Bowire.Protocol.Akka/TappedMessage.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ namespace Kuestenlogik.Bowire.Protocol.Akka;
77

88
/// <summary>
99
/// One observation of a message landing in (or sent to) an actor's mailbox.
10-
/// Captured by <see cref="BowireTapMessageQueue"/> on enqueue and
11-
/// forwarded to <see cref="BowireAkkaExtension"/>'s broadcast channel.
10+
/// Captured by <see cref="BowireTapMessageQueue"/> on enqueue — or by the
11+
/// <see cref="BowireAkkaExtension"/>'s <c>DeadLetter</c> subscription on
12+
/// undelivered envelopes — and forwarded to the broadcast channel.
1213
/// </summary>
1314
/// <param name="Recipient">Absolute actor path of the receiver.</param>
1415
/// <param name="Sender">Absolute path of the sender, or <c>deadLetters</c>.</param>
@@ -19,9 +20,17 @@ namespace Kuestenlogik.Bowire.Protocol.Akka;
1920
/// or System.Text.Json for richer inspection.
2021
/// </param>
2122
/// <param name="Timestamp">UTC timestamp of the enqueue.</param>
23+
/// <param name="IsDeadLetter">
24+
/// True when this observation came from the actor system's
25+
/// <c>Akka.Event.EventStream</c> as an <c>Akka.Event.DeadLetter</c> rather
26+
/// than from a tapped mailbox enqueue. Lets the Bowire UI style
27+
/// undeliverable messages distinctly. Defaults to <c>false</c> for
28+
/// backwards compatibility.
29+
/// </param>
2230
public sealed record TappedMessage(
2331
string Recipient,
2432
string Sender,
2533
string MessageType,
2634
string Payload,
27-
DateTime Timestamp);
35+
DateTime Timestamp,
36+
bool IsDeadLetter = false);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2026 Küstenlogik
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using Akka.Actor;
5+
6+
namespace Kuestenlogik.Bowire.Protocol.Akka.Tests;
7+
8+
public sealed class BowireDeadLetterTests
9+
{
10+
[Fact]
11+
public async Task Subscriber_ReceivesDeadLetter_FromEventStream()
12+
{
13+
using var system = ActorSystem.Create("test-deadletter");
14+
var ext = BowireAkkaExtensionProvider.Instance.Apply((ExtendedActorSystem)system);
15+
16+
var reader = ext.Subscribe(out var token);
17+
try
18+
{
19+
// Tell to a terminated actor → Akka publishes a DeadLetter on
20+
// the EventStream. We stop the actor and then send to it; the
21+
// mailbox is gone so the message can't be delivered.
22+
var doomed = system.ActorOf(NoopActor.Build(), "doomed");
23+
await doomed.GracefulStop(TimeSpan.FromSeconds(2));
24+
doomed.Tell("orphan-message");
25+
26+
// Read messages until we find one flagged as a dead letter
27+
// (the gracefulStop poison-pill itself doesn't end up as a
28+
// dead letter, but defensive filtering keeps the assertion
29+
// crisp).
30+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
31+
TappedMessage? deadLetter = null;
32+
while (!cts.IsCancellationRequested)
33+
{
34+
var got = await reader.ReadAsync(cts.Token);
35+
if (got.IsDeadLetter && got.Payload == "orphan-message")
36+
{
37+
deadLetter = got;
38+
break;
39+
}
40+
}
41+
42+
Assert.NotNull(deadLetter);
43+
Assert.True(deadLetter!.IsDeadLetter);
44+
Assert.Equal("System.String", deadLetter.MessageType);
45+
Assert.Equal("orphan-message", deadLetter.Payload);
46+
Assert.Equal(system.DeadLetters.Path.ToString(), deadLetter.Recipient);
47+
}
48+
finally
49+
{
50+
ext.Unsubscribe(token);
51+
}
52+
}
53+
54+
[Fact]
55+
public void RegularTappedMessage_HasIsDeadLetterFalseByDefault()
56+
{
57+
// Backwards-compatibility check — the new optional parameter
58+
// defaults to false so callers from before 0.2.0 keep working.
59+
var msg = new TappedMessage(
60+
Recipient: "akka://test/user/foo",
61+
Sender: "akka://test/user/bar",
62+
MessageType: "System.String",
63+
Payload: "hi",
64+
Timestamp: DateTime.UtcNow);
65+
66+
Assert.False(msg.IsDeadLetter);
67+
}
68+
69+
#pragma warning disable CA1812 // Akka instantiates these via reflection in Props.Create<T>()
70+
private sealed class NoopActor : UntypedActor
71+
{
72+
public static Props Build() => global::Akka.Actor.Props.Create<NoopActor>();
73+
protected override void OnReceive(object message) { /* no-op */ }
74+
}
75+
#pragma warning restore CA1812
76+
}

0 commit comments

Comments
 (0)