Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,11 @@ usable as an `IEventContext`. The v1 channel adapter is in-process and
object-only; it forwards existing typed envelopes through
`System.Threading.Channels` and does not serialize payloads. Default
`ChannelPipe` channels are unbounded and intended for local composition, tests,
and same-process boundaries, not as a throughput or backpressure policy.
and same-process boundaries, not as a throughput or backpressure policy. Both
`Eventa` and `Eventa.Adapters` enable `IsAotCompatible=true`; the current
Release builds run cleanly under the trim and Native AOT analyzers, and the
channel adapter keeps its transport path free of reflection, dynamic dispatch,
and runtime generic construction.

## Project Layout

Expand Down
19 changes: 10 additions & 9 deletions docs/rfcs/rfc-1-channel-adapter.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ RFC PR: https://github.com/moeru-ai/eventa.net/pull/18

Start date: 2026-05-09

Last reviewed: 2026-05-16
Last reviewed: 2026-05-23

## Summary

Expand Down Expand Up @@ -114,7 +114,7 @@ using var endpoint = new ChannelEndpoint(
outbound.Writer,
new ChannelEndpointOptions
{
CompleteOutboundOnDispose = true,
CompleteOutboundOnTerminal = true,
});

IEventContext context = endpoint;
Expand Down Expand Up @@ -158,7 +158,7 @@ public sealed class ChannelPipeOptions

public sealed class ChannelEndpointOptions
{
public bool CompleteOutboundOnDispose { get; init; }
public bool CompleteOutboundOnTerminal { get; init; }

public EventDefinition<ChannelClosedPayload> ClosedEvent { get; init; }
= ChannelEvents.Closed;
Expand Down Expand Up @@ -187,9 +187,10 @@ ownership is not configurable through `ChannelPipeOptions`. `ChannelPipeOptions`
applies `ClosedEvent` symmetrically to both pipe-created endpoints, `Left` and
`Right`.

Custom `ChannelEndpoint` instances respect `CompleteOutboundOnDispose`; when it
is `true`, disposal completes the externally supplied outbound writer, and when
it is `false`, writer completion is left to the external owner.
Custom `ChannelEndpoint` instances respect `CompleteOutboundOnTerminal`; when it
is `true`, endpoint terminal transitions complete the externally supplied
outbound writer, and when it is `false`, writer completion is left to the
external owner.

Recommended new core transport-facing abstractions:

Expand Down Expand Up @@ -480,7 +481,7 @@ transport fatal delivery path.

- treats local endpoint disposal as a local transport terminal condition
- stops accepting new outbound sends
- completes the outbound writer when `CompleteOutboundOnDispose` is `true`
- completes the outbound writer when `CompleteOutboundOnTerminal` is `true`
- cancels the inbound pump for local disposal
- runs deterministic transport fatal notification before disposing the context
- best-effort dispatches the configured closed event before disposing the context
Expand Down Expand Up @@ -535,7 +536,7 @@ Otherwise local endpoint disposal faults the session with

When outbound completion is owned, or when the external owner completes or faults
the writer, the paired endpoint observes disposal through channel completion and
uses the remote close or fault sequence above. If `CompleteOutboundOnDispose` is
uses the remote close or fault sequence above. If `CompleteOutboundOnTerminal` is
`false` and no external completion or fault occurs, the paired endpoint is not
guaranteed to observe local endpoint disposal.

Expand Down Expand Up @@ -635,7 +636,7 @@ diagnostics.
`InnerException`.
- Lifecycle: disposing one pipe-created endpoint completes its owned outbound
writer, so the paired endpoint observes channel completion; custom endpoint
disposal respects `CompleteOutboundOnDispose=true` and `false`; concurrent
disposal respects `CompleteOutboundOnTerminal=true` and `false`; concurrent
disposal and disposal through `IEventContext` are idempotent, do not
double-notify, and stop inbound loops.
- Post-terminal operations: user-initiated `Emit`, listener registration,
Expand Down
48 changes: 46 additions & 2 deletions src/Eventa.Adapters/Channels/ChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,59 @@ public void OnSent(string eventId, object? envelope, object? options = null)
$"Event '{eventId}' was sent with an envelope that does not implement {nameof(IEventEnvelope)}.");
}

if (!outbound.TryWrite(new ChannelMessage(eventEnvelope, options)))
var message = new ChannelMessage(eventEnvelope, options);
if (outbound.TryWrite(message)) return;

// Keep Emit synchronous: probe terminal state vs. transient pressure without waiting.
// If the probe would block, cancel it so bounded channels don't retain an abandoned
// WaitToWriteAsync waiter after we've already decided to fail fast.
using var waitToWriteCancellation = new CancellationTokenSource();
var waitToWrite = outbound.WaitToWriteAsync(waitToWriteCancellation.Token);
if (!waitToWrite.IsCompleted)
{
throw new ChannelClosedException("Channel endpoint closed.");
CancelAndObservePendingWait(waitToWrite, waitToWriteCancellation);
throw new InvalidOperationException("Outbound channel could not accept the message immediately.");
}
Comment thread
Garfield550 marked this conversation as resolved.

// Short-circuit evaluation keeps ValueTask.Result consumed at most once, and only after
// the wait has completed successfully.
if (waitToWrite.IsCompletedSuccessfully && waitToWrite.Result)
{
// WaitToWriteAsync(true) only reports a writable window; another writer can still win
// that race before we claim the slot, so we need one more non-blocking write attempt.
if (outbound.TryWrite(message)) return;

throw new InvalidOperationException("Outbound channel could not accept the message immediately.");
}

// A synchronously completed false/faulted wait means the writer is terminal. Use WriteAsync
// here so completed/faulted channels surface their original exception shape unchanged.
outbound.WriteAsync(message).AsTask().GetAwaiter().GetResult();
}

/// <inheritdoc />
public void OnReceived(string eventId, object? envelope, object? options = null) { }

private static void CancelAndObservePendingWait(
ValueTask<bool> waitToWrite,
CancellationTokenSource waitToWriteCancellation)
{
// Consume the pending wait even after canceling it so pooled
// IValueTaskSource-backed channel waiters are not abandoned.
var waitToWriteTask = waitToWrite.AsTask();
waitToWriteCancellation.Cancel();

_ = waitToWriteTask.ContinueWith(
static task =>
{
if (!task.IsFaulted) return;
_ = task.Exception;
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}

/// <inheritdoc />
public void Dispose()
{
Expand Down
19 changes: 12 additions & 7 deletions src/Eventa.Adapters/Channels/ChannelEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void Dispose()
{
Terminate(
new ChannelClosedException("Channel endpoint disposed."),
completeOutbound: _options.CompleteOutboundOnDispose,
completeOutbound: _options.CompleteOutboundOnTerminal,
cancelInbound: true);
}

Expand All @@ -126,6 +126,8 @@ public void Dispose()
/// <returns>A task that completes when the inbound channel closes or the endpoint terminates.</returns>
private async Task RunInboundPumpAsync()
{
var completeOutbound = _options.CompleteOutboundOnTerminal;

try
{
await foreach (var message in _inbound.ReadAllAsync(_disposeCancellation.Token).ConfigureAwait(false))
Expand All @@ -138,15 +140,12 @@ private async Task RunInboundPumpAsync()
_context.Receive(message.Envelope, message.Options);
}

Terminate(
new ChannelClosedException("Channel closed."),
completeOutbound: true,
cancelInbound: false);
Terminate(new ChannelClosedException("Channel closed."), completeOutbound, cancelInbound: false);
}
catch (OperationCanceledException) when (Volatile.Read(ref _terminalError) is not null) { }
catch (Exception error)
{
Terminate(error, completeOutbound: true, cancelInbound: false, outboundError: error);
Terminate(error, completeOutbound, cancelInbound: false, outboundError: error);
}
finally
{
Expand Down Expand Up @@ -180,7 +179,13 @@ private void Terminate(

if (cancelInbound)
{
_disposeCancellation.Cancel();
try
{
// The inbound pump disposes this CTS in its finally block, so a later local Dispose
// can race with that cleanup after terminal ownership has already been decided.
_disposeCancellation.Cancel();
}
catch (ObjectDisposedException) { }
}

_context.NotifyTransportFatal(error);
Expand Down
6 changes: 2 additions & 4 deletions src/Eventa.Adapters/Channels/ChannelMessages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ namespace Eventa.Adapters.Channels;
/// <summary>
/// Carries one Eventa envelope and optional adapter metadata across an in-process channel.
/// </summary>
/// <param name="Envelope">The already-created Eventa envelope, or <see langword="null"/> for a malformed message.</param>
/// <param name="Envelope">The already-created Eventa envelope.</param>
/// <param name="Options">Optional adapter metadata associated with the envelope.</param>
public sealed record ChannelMessage(
IEventEnvelope? Envelope,
object? Options = null);
public sealed record ChannelMessage(IEventEnvelope Envelope, object? Options = null);

/// <summary>
/// Payload emitted when a channel endpoint observes terminal transport closure.
Expand Down
5 changes: 3 additions & 2 deletions src/Eventa.Adapters/Channels/ChannelOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ public sealed class ChannelPipeOptions
public sealed class ChannelEndpointOptions
{
/// <summary>
/// Gets whether endpoint disposal completes the supplied outbound writer.
/// Gets whether endpoint terminal transitions complete the supplied outbound writer,
/// including local disposal and observed inbound close or fault.
/// </summary>
public bool CompleteOutboundOnDispose { get; init; }
public bool CompleteOutboundOnTerminal { get; init; }

/// <summary>
/// Gets the public event emitted when the endpoint observes terminal closure.
Expand Down
2 changes: 1 addition & 1 deletion src/Eventa.Adapters/Channels/ChannelPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public ChannelPipe(ChannelPipeOptions? options = null)
var rightToLeft = Channel.CreateUnbounded<ChannelMessage>(ChannelOptions);
var endpointOptions = new ChannelEndpointOptions
{
CompleteOutboundOnDispose = true,
CompleteOutboundOnTerminal = true,
ClosedEvent = options.ClosedEvent,
};

Expand Down
5 changes: 4 additions & 1 deletion src/Eventa/Support/AsyncSignalQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ internal sealed class AsyncSignalQueue<T>
{
SingleReader = false,
SingleWriter = false,
AllowSynchronousContinuations = false,
// Stream invoke sessions surface transport-fatal completion through this queue. Running
// wait continuations inline keeps a pending MoveNextAsync faulted before channel adapters
// continue on to their public closed-event dispatch, which is the RFC 1 ordering contract.
AllowSynchronousContinuations = true,
Comment thread
Garfield550 marked this conversation as resolved.
};

private readonly Channel<T> _channel = Channel.CreateUnbounded<T>(ChannelOptions);
Expand Down
Loading
Loading