Skip to content

Commit 4e52250

Browse files
committed
feat(adapters): align channel adapter with RFC 1
1 parent 0093388 commit 4e52250

6 files changed

Lines changed: 376 additions & 21 deletions

File tree

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,11 @@ usable as an `IEventContext`. The v1 channel adapter is in-process and
206206
object-only; it forwards existing typed envelopes through
207207
`System.Threading.Channels` and does not serialize payloads. Default
208208
`ChannelPipe` channels are unbounded and intended for local composition, tests,
209-
and same-process boundaries, not as a throughput or backpressure policy.
209+
and same-process boundaries, not as a throughput or backpressure policy. Both
210+
`Eventa` and `Eventa.Adapters` enable `IsAotCompatible=true`; the current
211+
Release builds run cleanly under the trim and Native AOT analyzers, and the
212+
channel adapter keeps its transport path free of reflection, dynamic dispatch,
213+
and runtime generic construction.
210214

211215
## Project Layout
212216

src/Eventa.Adapters/Channels/ChannelAdapter.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,28 @@ public void OnSent(string eventId, object? envelope, object? options = null)
2525
$"Event '{eventId}' was sent with an envelope that does not implement {nameof(IEventEnvelope)}.");
2626
}
2727

28-
if (!outbound.TryWrite(new ChannelMessage(eventEnvelope, options)))
28+
var message = new ChannelMessage(eventEnvelope, options);
29+
if (outbound.TryWrite(message)) return;
30+
31+
// Keep Emit synchronous: probe terminal state vs. transient pressure without waiting.
32+
var waitToWrite = outbound.WaitToWriteAsync();
33+
if (!waitToWrite.IsCompleted)
2934
{
30-
throw new ChannelClosedException("Channel endpoint closed.");
35+
throw new InvalidOperationException("Outbound channel could not accept the message immediately.");
3136
}
37+
38+
// WaitToWriteAsync(true) only reports a writable window; another writer can still win
39+
// that race before we claim the slot, so we need one more non-blocking write attempt.
40+
if (waitToWrite.IsCompletedSuccessfully && waitToWrite.Result && outbound.TryWrite(message)) return;
41+
42+
if (waitToWrite.IsCompletedSuccessfully && waitToWrite.Result)
43+
{
44+
throw new InvalidOperationException("Outbound channel could not accept the message immediately.");
45+
}
46+
47+
// A synchronously completed false/faulted wait means the writer is terminal. Use WriteAsync
48+
// here so completed/faulted channels surface their original exception shape unchanged.
49+
outbound.WriteAsync(message).AsTask().GetAwaiter().GetResult();
3250
}
3351

3452
/// <inheritdoc />

src/Eventa.Adapters/Channels/ChannelEndpoint.cs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ public void Dispose()
126126
/// <returns>A task that completes when the inbound channel closes or the endpoint terminates.</returns>
127127
private async Task RunInboundPumpAsync()
128128
{
129+
var completeOutbound = _options.CompleteOutboundOnDispose;
130+
129131
try
130132
{
131133
await foreach (var message in _inbound.ReadAllAsync(_disposeCancellation.Token).ConfigureAwait(false))
@@ -138,15 +140,12 @@ private async Task RunInboundPumpAsync()
138140
_context.Receive(message.Envelope, message.Options);
139141
}
140142

141-
Terminate(
142-
new ChannelClosedException("Channel closed."),
143-
completeOutbound: true,
144-
cancelInbound: false);
143+
Terminate(new ChannelClosedException("Channel closed."), completeOutbound, cancelInbound: false);
145144
}
146145
catch (OperationCanceledException) when (Volatile.Read(ref _terminalError) is not null) { }
147146
catch (Exception error)
148147
{
149-
Terminate(error, completeOutbound: true, cancelInbound: false, outboundError: error);
148+
Terminate(error, completeOutbound, cancelInbound: false, outboundError: error);
150149
}
151150
finally
152151
{
@@ -180,7 +179,13 @@ private void Terminate(
180179

181180
if (cancelInbound)
182181
{
183-
_disposeCancellation.Cancel();
182+
try
183+
{
184+
// The inbound pump disposes this CTS in its finally block, so a later local Dispose
185+
// can race with that cleanup after terminal ownership has already been decided.
186+
_disposeCancellation.Cancel();
187+
}
188+
catch (ObjectDisposedException) { }
184189
}
185190

186191
_context.NotifyTransportFatal(error);

src/Eventa.Adapters/Channels/ChannelMessages.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,9 @@ namespace Eventa.Adapters.Channels;
55
/// <summary>
66
/// Carries one Eventa envelope and optional adapter metadata across an in-process channel.
77
/// </summary>
8-
/// <param name="Envelope">The already-created Eventa envelope, or <see langword="null"/> for a malformed message.</param>
8+
/// <param name="Envelope">The already-created Eventa envelope.</param>
99
/// <param name="Options">Optional adapter metadata associated with the envelope.</param>
10-
public sealed record ChannelMessage(
11-
IEventEnvelope? Envelope,
12-
object? Options = null);
10+
public sealed record ChannelMessage(IEventEnvelope Envelope, object? Options = null);
1311

1412
/// <summary>
1513
/// Payload emitted when a channel endpoint observes terminal transport closure.

src/Eventa/Support/AsyncSignalQueue.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ internal sealed class AsyncSignalQueue<T>
1515
{
1616
SingleReader = false,
1717
SingleWriter = false,
18-
AllowSynchronousContinuations = false,
18+
AllowSynchronousContinuations = true,
1919
};
2020

2121
private readonly Channel<T> _channel = Channel.CreateUnbounded<T>(ChannelOptions);

0 commit comments

Comments
 (0)