diff --git a/README.md b/README.md index 9ce3f5b..e6be8e2 100644 --- a/README.md +++ b/README.md @@ -206,7 +206,11 @@ usable as an `IEventContext`. The v1 channel adapter is in-process and object-only; it forwards existing typed envelopes through `System.Threading.Channels` and does not serialize payloads. Default `ChannelPipe` channels are unbounded and intended for local composition, tests, -and same-process boundaries, not as a throughput or backpressure policy. +and same-process boundaries, not as a throughput or backpressure policy. Both +`Eventa` and `Eventa.Adapters` enable `IsAotCompatible=true`; the current +Release builds run cleanly under the trim and Native AOT analyzers, and the +channel adapter keeps its transport path free of reflection, dynamic dispatch, +and runtime generic construction. ## Project Layout diff --git a/docs/rfcs/rfc-1-channel-adapter.md b/docs/rfcs/rfc-1-channel-adapter.md index 5c2fe09..62c3dbe 100644 --- a/docs/rfcs/rfc-1-channel-adapter.md +++ b/docs/rfcs/rfc-1-channel-adapter.md @@ -6,7 +6,7 @@ RFC PR: https://github.com/moeru-ai/eventa.net/pull/18 Start date: 2026-05-09 -Last reviewed: 2026-05-16 +Last reviewed: 2026-05-23 ## Summary @@ -114,7 +114,7 @@ using var endpoint = new ChannelEndpoint( outbound.Writer, new ChannelEndpointOptions { - CompleteOutboundOnDispose = true, + CompleteOutboundOnTerminal = true, }); IEventContext context = endpoint; @@ -158,7 +158,7 @@ public sealed class ChannelPipeOptions public sealed class ChannelEndpointOptions { - public bool CompleteOutboundOnDispose { get; init; } + public bool CompleteOutboundOnTerminal { get; init; } public EventDefinition ClosedEvent { get; init; } = ChannelEvents.Closed; @@ -187,9 +187,10 @@ ownership is not configurable through `ChannelPipeOptions`. `ChannelPipeOptions` applies `ClosedEvent` symmetrically to both pipe-created endpoints, `Left` and `Right`. -Custom `ChannelEndpoint` instances respect `CompleteOutboundOnDispose`; when it -is `true`, disposal completes the externally supplied outbound writer, and when -it is `false`, writer completion is left to the external owner. +Custom `ChannelEndpoint` instances respect `CompleteOutboundOnTerminal`; when it +is `true`, endpoint terminal transitions complete the externally supplied +outbound writer, and when it is `false`, writer completion is left to the +external owner. Recommended new core transport-facing abstractions: @@ -480,7 +481,7 @@ transport fatal delivery path. - treats local endpoint disposal as a local transport terminal condition - stops accepting new outbound sends -- completes the outbound writer when `CompleteOutboundOnDispose` is `true` +- completes the outbound writer when `CompleteOutboundOnTerminal` is `true` - cancels the inbound pump for local disposal - runs deterministic transport fatal notification before disposing the context - best-effort dispatches the configured closed event before disposing the context @@ -535,7 +536,7 @@ Otherwise local endpoint disposal faults the session with When outbound completion is owned, or when the external owner completes or faults the writer, the paired endpoint observes disposal through channel completion and -uses the remote close or fault sequence above. If `CompleteOutboundOnDispose` is +uses the remote close or fault sequence above. If `CompleteOutboundOnTerminal` is `false` and no external completion or fault occurs, the paired endpoint is not guaranteed to observe local endpoint disposal. @@ -635,7 +636,7 @@ diagnostics. `InnerException`. - Lifecycle: disposing one pipe-created endpoint completes its owned outbound writer, so the paired endpoint observes channel completion; custom endpoint - disposal respects `CompleteOutboundOnDispose=true` and `false`; concurrent + disposal respects `CompleteOutboundOnTerminal=true` and `false`; concurrent disposal and disposal through `IEventContext` are idempotent, do not double-notify, and stop inbound loops. - Post-terminal operations: user-initiated `Emit`, listener registration, diff --git a/src/Eventa.Adapters/Channels/ChannelAdapter.cs b/src/Eventa.Adapters/Channels/ChannelAdapter.cs index 5125eb4..72f9e9b 100644 --- a/src/Eventa.Adapters/Channels/ChannelAdapter.cs +++ b/src/Eventa.Adapters/Channels/ChannelAdapter.cs @@ -25,15 +25,59 @@ public void OnSent(string eventId, object? envelope, object? options = null) $"Event '{eventId}' was sent with an envelope that does not implement {nameof(IEventEnvelope)}."); } - if (!outbound.TryWrite(new ChannelMessage(eventEnvelope, options))) + var message = new ChannelMessage(eventEnvelope, options); + if (outbound.TryWrite(message)) return; + + // Keep Emit synchronous: probe terminal state vs. transient pressure without waiting. + // If the probe would block, cancel it so bounded channels don't retain an abandoned + // WaitToWriteAsync waiter after we've already decided to fail fast. + using var waitToWriteCancellation = new CancellationTokenSource(); + var waitToWrite = outbound.WaitToWriteAsync(waitToWriteCancellation.Token); + if (!waitToWrite.IsCompleted) { - throw new ChannelClosedException("Channel endpoint closed."); + CancelAndObservePendingWait(waitToWrite, waitToWriteCancellation); + throw new InvalidOperationException("Outbound channel could not accept the message immediately."); + } + + // Short-circuit evaluation keeps ValueTask.Result consumed at most once, and only after + // the wait has completed successfully. + if (waitToWrite.IsCompletedSuccessfully && waitToWrite.Result) + { + // WaitToWriteAsync(true) only reports a writable window; another writer can still win + // that race before we claim the slot, so we need one more non-blocking write attempt. + if (outbound.TryWrite(message)) return; + + throw new InvalidOperationException("Outbound channel could not accept the message immediately."); } + + // A synchronously completed false/faulted wait means the writer is terminal. Use WriteAsync + // here so completed/faulted channels surface their original exception shape unchanged. + outbound.WriteAsync(message).AsTask().GetAwaiter().GetResult(); } /// public void OnReceived(string eventId, object? envelope, object? options = null) { } + private static void CancelAndObservePendingWait( + ValueTask waitToWrite, + CancellationTokenSource waitToWriteCancellation) + { + // Consume the pending wait even after canceling it so pooled + // IValueTaskSource-backed channel waiters are not abandoned. + var waitToWriteTask = waitToWrite.AsTask(); + waitToWriteCancellation.Cancel(); + + _ = waitToWriteTask.ContinueWith( + static task => + { + if (!task.IsFaulted) return; + _ = task.Exception; + }, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + /// public void Dispose() { diff --git a/src/Eventa.Adapters/Channels/ChannelEndpoint.cs b/src/Eventa.Adapters/Channels/ChannelEndpoint.cs index bf914bc..7dec260 100644 --- a/src/Eventa.Adapters/Channels/ChannelEndpoint.cs +++ b/src/Eventa.Adapters/Channels/ChannelEndpoint.cs @@ -116,7 +116,7 @@ public void Dispose() { Terminate( new ChannelClosedException("Channel endpoint disposed."), - completeOutbound: _options.CompleteOutboundOnDispose, + completeOutbound: _options.CompleteOutboundOnTerminal, cancelInbound: true); } @@ -126,6 +126,8 @@ public void Dispose() /// A task that completes when the inbound channel closes or the endpoint terminates. private async Task RunInboundPumpAsync() { + var completeOutbound = _options.CompleteOutboundOnTerminal; + try { await foreach (var message in _inbound.ReadAllAsync(_disposeCancellation.Token).ConfigureAwait(false)) @@ -138,15 +140,12 @@ private async Task RunInboundPumpAsync() _context.Receive(message.Envelope, message.Options); } - Terminate( - new ChannelClosedException("Channel closed."), - completeOutbound: true, - cancelInbound: false); + Terminate(new ChannelClosedException("Channel closed."), completeOutbound, cancelInbound: false); } catch (OperationCanceledException) when (Volatile.Read(ref _terminalError) is not null) { } catch (Exception error) { - Terminate(error, completeOutbound: true, cancelInbound: false, outboundError: error); + Terminate(error, completeOutbound, cancelInbound: false, outboundError: error); } finally { @@ -180,7 +179,13 @@ private void Terminate( if (cancelInbound) { - _disposeCancellation.Cancel(); + try + { + // The inbound pump disposes this CTS in its finally block, so a later local Dispose + // can race with that cleanup after terminal ownership has already been decided. + _disposeCancellation.Cancel(); + } + catch (ObjectDisposedException) { } } _context.NotifyTransportFatal(error); diff --git a/src/Eventa.Adapters/Channels/ChannelMessages.cs b/src/Eventa.Adapters/Channels/ChannelMessages.cs index aeb7698..e8db095 100644 --- a/src/Eventa.Adapters/Channels/ChannelMessages.cs +++ b/src/Eventa.Adapters/Channels/ChannelMessages.cs @@ -5,11 +5,9 @@ namespace Eventa.Adapters.Channels; /// /// Carries one Eventa envelope and optional adapter metadata across an in-process channel. /// -/// The already-created Eventa envelope, or for a malformed message. +/// The already-created Eventa envelope. /// Optional adapter metadata associated with the envelope. -public sealed record ChannelMessage( - IEventEnvelope? Envelope, - object? Options = null); +public sealed record ChannelMessage(IEventEnvelope Envelope, object? Options = null); /// /// Payload emitted when a channel endpoint observes terminal transport closure. diff --git a/src/Eventa.Adapters/Channels/ChannelOptions.cs b/src/Eventa.Adapters/Channels/ChannelOptions.cs index 59cf1d9..25bb784 100644 --- a/src/Eventa.Adapters/Channels/ChannelOptions.cs +++ b/src/Eventa.Adapters/Channels/ChannelOptions.cs @@ -19,9 +19,10 @@ public sealed class ChannelPipeOptions public sealed class ChannelEndpointOptions { /// - /// Gets whether endpoint disposal completes the supplied outbound writer. + /// Gets whether endpoint terminal transitions complete the supplied outbound writer, + /// including local disposal and observed inbound close or fault. /// - public bool CompleteOutboundOnDispose { get; init; } + public bool CompleteOutboundOnTerminal { get; init; } /// /// Gets the public event emitted when the endpoint observes terminal closure. diff --git a/src/Eventa.Adapters/Channels/ChannelPipe.cs b/src/Eventa.Adapters/Channels/ChannelPipe.cs index 8a46bfe..3ca3804 100644 --- a/src/Eventa.Adapters/Channels/ChannelPipe.cs +++ b/src/Eventa.Adapters/Channels/ChannelPipe.cs @@ -28,7 +28,7 @@ public ChannelPipe(ChannelPipeOptions? options = null) var rightToLeft = Channel.CreateUnbounded(ChannelOptions); var endpointOptions = new ChannelEndpointOptions { - CompleteOutboundOnDispose = true, + CompleteOutboundOnTerminal = true, ClosedEvent = options.ClosedEvent, }; diff --git a/src/Eventa/Support/AsyncSignalQueue.cs b/src/Eventa/Support/AsyncSignalQueue.cs index 4eed7f9..151ca46 100644 --- a/src/Eventa/Support/AsyncSignalQueue.cs +++ b/src/Eventa/Support/AsyncSignalQueue.cs @@ -15,7 +15,10 @@ internal sealed class AsyncSignalQueue { SingleReader = false, SingleWriter = false, - AllowSynchronousContinuations = false, + // Stream invoke sessions surface transport-fatal completion through this queue. Running + // wait continuations inline keeps a pending MoveNextAsync faulted before channel adapters + // continue on to their public closed-event dispatch, which is the RFC 1 ordering contract. + AllowSynchronousContinuations = true, }; private readonly Channel _channel = Channel.CreateUnbounded(ChannelOptions); diff --git a/tests/Eventa.Adapters.Tests/Channels/ChannelPipeTests.cs b/tests/Eventa.Adapters.Tests/Channels/ChannelPipeTests.cs index ef72b08..b2d3b04 100644 --- a/tests/Eventa.Adapters.Tests/Channels/ChannelPipeTests.cs +++ b/tests/Eventa.Adapters.Tests/Channels/ChannelPipeTests.cs @@ -1,10 +1,12 @@ using System.Reflection; using System.Runtime.CompilerServices; using System.Threading.Channels; +using System.Threading.Tasks.Sources; using Eventa.Adapters.Channels; using ChannelClosedException = Eventa.Adapters.Channels.ChannelClosedException; +using SystemChannelClosedException = System.Threading.Channels.ChannelClosedException; namespace Eventa.Adapters.Tests.Channels; @@ -135,6 +137,41 @@ public async Task Dispose_FaultsPendingUnaryInvokeBeforeClosedEvent() Assert.Equal("Channel endpoint disposed.", error.Message); } + [Fact] + public async Task Dispose_FaultsActiveStreamBeforeClosedEvent() + { + using var pipe = new ChannelPipe(); + var definition = new InvokeEventDefinition("channel:dispose-stream"); + var closedObserved = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + using var _ = pipe.Right.RegisterStreamHandler(definition, PendingAsync); + + var client = pipe.Left.CreateInvokeStreamClient(definition); + await using var enumerator = client.InvokeAsync(1, CancellationToken.None) + .GetAsyncEnumerator(TestContext.Current.CancellationToken); + var pendingMoveNext = enumerator.MoveNextAsync().AsTask(); + + using var __ = pipe.Left.Subscribe(ChannelEvents.Closed, _ => + { + if (pendingMoveNext.IsFaulted) + { + closedObserved.TrySetResult(true); + } + else + { + closedObserved.TrySetException(new InvalidOperationException("Closed event ran before stream faulted.")); + } + }); + + pipe.Left.Dispose(); + + var error = await Assert.ThrowsAsync( + async () => await pendingMoveNext.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken)); + await closedObserved.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + + Assert.Equal("Channel endpoint disposed.", error.Message); + } + [Fact] public void Emit_AfterEndpointDisposed_FailsFast() { @@ -147,6 +184,67 @@ public void Emit_AfterEndpointDisposed_FailsFast() Assert.Equal("Channel endpoint disposed.", error.Message); } + [Fact] + public void ListenerOperations_AfterEndpointDisposed_FailFast() + { + using var pipe = new ChannelPipe(); + var definition = new EventDefinition("channel:post-terminal-listeners"); + var matchExpression = new MatchExpression("channel:post-terminal-match", _ => true); + Action> handler = _ => { }; + + pipe.Left.Dispose(); + + AssertDisposed(() => pipe.Left.Subscribe(definition, handler)); + AssertDisposed(() => pipe.Left.SubscribeOnce(definition, handler)); + AssertDisposed(() => pipe.Left.Unsubscribe(definition, handler)); + AssertDisposed(() => pipe.Left.Subscribe(matchExpression, handler)); + AssertDisposed(() => pipe.Left.SubscribeOnce(matchExpression, handler)); + AssertDisposed(() => pipe.Left.Unsubscribe(matchExpression, handler)); + } + + [Fact] + public void HandlerRegistration_AfterEndpointDisposed_FailsFast() + { + using var pipe = new ChannelPipe(); + var invokeDefinition = new InvokeEventDefinition("channel:post-terminal-invoke-handler"); + var streamDefinition = new InvokeEventDefinition("channel:post-terminal-stream-handler"); + + pipe.Left.Dispose(); + + AssertDisposed(() => pipe.Left.RegisterInvokeHandler( + invokeDefinition, + static (request, _) => Task.FromResult(request))); + AssertDisposed(() => pipe.Left.RegisterStreamHandler(streamDefinition, CountAsync)); + } + + [Fact] + public void CreateInvokeClient_AfterEndpointDisposed_FirstInvokeFailsFast() + { + using var pipe = new ChannelPipe(); + var definition = new InvokeEventDefinition("channel:post-terminal-client"); + + pipe.Left.Dispose(); + + var client = pipe.Left.CreateInvokeClient(definition); + Assert.NotNull(client); + + AssertDisposed(() => client.InvokeAsync("late", TestContext.Current.CancellationToken)); + } + + [Fact] + public void CreateInvokeStreamClient_AfterEndpointDisposed_FirstInvokeFailsFast() + { + using var pipe = new ChannelPipe(); + var definition = new InvokeEventDefinition("channel:post-terminal-stream-client"); + + pipe.Left.Dispose(); + + var client = pipe.Left.CreateInvokeStreamClient(definition); + Assert.NotNull(client); + + AssertDisposed(() => client.InvokeAsync(1, TestContext.Current.CancellationToken)); + } + [Fact] public async Task Dispose_WhenCalled_ReleasesInboundPumpCancellationSource() { @@ -173,7 +271,7 @@ public async Task InboundChannelCompletion_WhenObserved_ReleasesInboundPumpCance } [Fact] - public void Emit_WhenOutboundWriterCompleted_ThrowsChannelClosedException() + public void Emit_WhenOutboundWriterCompleted_PropagatesWriterFailureUnchanged() { var inbound = Channel.CreateUnbounded(); var outbound = Channel.CreateUnbounded(); @@ -182,12 +280,44 @@ public void Emit_WhenOutboundWriterCompleted_ThrowsChannelClosedException() outbound.Writer.TryComplete(); - var error = Assert.Throws(() => endpoint.Emit(definition, new TestPayload("late"))); - Assert.Equal("Channel endpoint closed.", error.Message); + var error = Assert.Throws(() => endpoint.Emit(definition, new TestPayload("late"))); + Assert.Null(error.InnerException); + } + + [Fact] + public void Emit_WhenOutboundWriterFaulted_PropagatesOriginalWriterFailure() + { + var inbound = Channel.CreateUnbounded(); + var outbound = Channel.CreateUnbounded(); + using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer); + var definition = new EventDefinition("channel:writer-faulted"); + var expected = new InvalidOperationException("writer failed"); + + outbound.Writer.TryComplete(expected); + + var error = Assert.Throws(() => endpoint.Emit(definition, new TestPayload("late"))); + Assert.Same(expected, error.InnerException); + } + + [Fact] + public async Task InvokeAsync_WhenOutboundWriterFaulted_PropagatesOriginalWriterFailure() + { + var inbound = Channel.CreateUnbounded(); + var outbound = Channel.CreateUnbounded(); + using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer); + var definition = new InvokeEventDefinition("channel:invoke-writer-faulted"); + var expected = new InvalidOperationException("writer failed"); + var client = endpoint.CreateInvokeClient(definition); + + outbound.Writer.TryComplete(expected); + + var error = await Assert.ThrowsAsync( + async () => await client.InvokeAsync("request", TestContext.Current.CancellationToken)); + Assert.Same(expected, error.InnerException); } [Fact] - public async Task Emit_WhenBoundedOutboundFull_ThrowsChannelClosedExceptionWithoutBlocking() + public async Task Emit_WhenBoundedOutboundFull_FailsFastWithoutBlocking() { var inbound = Channel.CreateUnbounded(); var outbound = Channel.CreateBounded(new BoundedChannelOptions(1) @@ -197,16 +327,51 @@ public async Task Emit_WhenBoundedOutboundFull_ThrowsChannelClosedExceptionWitho using var endpoint = new ChannelEndpoint( inbound.Reader, outbound.Writer, - new ChannelEndpointOptions { CompleteOutboundOnDispose = true }); + new ChannelEndpointOptions { CompleteOutboundOnTerminal = true }); var definition = new EventDefinition("channel:bounded-full"); endpoint.Emit(definition, new TestPayload("first")); var secondEmit = Task.Run( - () => Assert.Throws(() => endpoint.Emit(definition, new TestPayload("second"))), + () => Assert.Throws(() => endpoint.Emit(definition, new TestPayload("second"))), TestContext.Current.CancellationToken); var error = await secondEmit.WaitAsync(TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken); - Assert.Equal("Channel endpoint closed.", error.Message); + Assert.Equal("Outbound channel could not accept the message immediately.", error.Message); + } + + [Fact] + public async Task Emit_WhenWriterSignalsWritableButRejectsTryWrite_FailsFastWithoutCallingWriteAsync() + { + var inbound = Channel.CreateUnbounded(); + var outbound = new ProbeRejectingWriter(); + using var endpoint = new ChannelEndpoint(inbound.Reader, outbound); + var definition = new EventDefinition("channel:writer-contention"); + + var emitTask = Task.Run( + () => Assert.Throws(() => endpoint.Emit(definition, new TestPayload("late"))), + TestContext.Current.CancellationToken); + + var error = await emitTask.WaitAsync(TimeSpan.FromMilliseconds(250), TestContext.Current.CancellationToken); + Assert.Equal("Outbound channel could not accept the message immediately.", error.Message); + Assert.Equal(0, Volatile.Read(ref outbound.WriteAsyncCalls)); + } + + [Fact] + public async Task Emit_WhenWaitToWriteWouldBlock_CancelsAndObservesPendingProbeBeforeThrowing() + { + var inbound = Channel.CreateUnbounded(); + var outbound = new ProbePendingWaitWriter(); + using var endpoint = new ChannelEndpoint(inbound.Reader, outbound); + var definition = new EventDefinition("channel:writer-wait-probe"); + + var error = Assert.Throws(() => endpoint.Emit(definition, new TestPayload("late"))); + + Assert.Equal("Outbound channel could not accept the message immediately.", error.Message); + Assert.Equal(1, Volatile.Read(ref outbound.WaitToWriteCalls)); + Assert.Equal(1, Volatile.Read(ref outbound.CanceledWaits)); + await outbound.ObservedTask.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + Assert.Equal(1, Volatile.Read(ref outbound.GetResultCalls)); + Assert.Equal(0, Volatile.Read(ref outbound.WriteAsyncCalls)); } [Fact] @@ -217,13 +382,86 @@ public async Task CustomEndpointDispose_WhenConfigured_CompletesOutboundForPaire using var left = new ChannelEndpoint( rightToLeft.Reader, leftToRight.Writer, - new ChannelEndpointOptions { CompleteOutboundOnDispose = true }); + new ChannelEndpointOptions { CompleteOutboundOnTerminal = true }); using var right = new ChannelEndpoint(leftToRight.Reader, rightToLeft.Writer); var payload = await WaitForClosedAsync(right, left.Dispose); var error = Assert.IsType(payload.Error); Assert.Equal("Channel closed.", error.Message); } + [Fact] + public async Task Dispose_WhenPipeAndEndpointDisposalsRace_EmitsClosedEventOncePerEndpoint() + { + using var pipe = new ChannelPipe(); + IEventContext leftContext = pipe.Left; + IEventContext rightContext = pipe.Right; + var leftClosed = CreateSingleClosedObserver(pipe.Left, "left"); + var rightClosed = CreateSingleClosedObserver(pipe.Right, "right"); + + await Task.WhenAll( + Task.Run(pipe.Dispose, TestContext.Current.CancellationToken), + Task.Run(leftContext.Dispose, TestContext.Current.CancellationToken), + Task.Run(rightContext.Dispose, TestContext.Current.CancellationToken)); + + await leftClosed.Observed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + await rightClosed.Observed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken); + + Assert.Equal(1, Volatile.Read(ref leftClosed.Count)); + Assert.Equal(1, Volatile.Read(ref rightClosed.Count)); + } + + [Fact] + public void CustomEndpointDispose_WhenNotConfigured_LeavesPairedEndpointUsable() + { + var leftToRight = Channel.CreateUnbounded(); + var rightToLeft = Channel.CreateUnbounded(); + using var left = new ChannelEndpoint( + rightToLeft.Reader, + leftToRight.Writer, + new ChannelEndpointOptions { CompleteOutboundOnTerminal = false }); + using var right = new ChannelEndpoint(leftToRight.Reader, rightToLeft.Writer); + var definition = new EventDefinition("channel:paired-still-open"); + + left.Dispose(); + + var error = Record.Exception(() => right.Emit(definition, new TestPayload("still-open"))); + Assert.Null(error); + } + + [Fact] + public async Task InboundCompletion_WhenOutboundNotOwned_LeavesOutboundWriterUsable() + { + var inbound = Channel.CreateUnbounded(); + var outbound = Channel.CreateUnbounded(); + using var endpoint = new ChannelEndpoint( + inbound.Reader, + outbound.Writer, + new ChannelEndpointOptions { CompleteOutboundOnTerminal = false }); + var payload = await WaitForClosedAsync(endpoint, () => inbound.Writer.TryComplete()); + var error = Assert.IsType(payload.Error); + + Assert.Equal("Channel closed.", error.Message); + Assert.True(outbound.Writer.TryWrite( + new ChannelMessage(new EventEnvelope("channel:external-owner", new TestPayload("still-open"))))); + } + + [Fact] + public async Task InboundFault_WhenOutboundNotOwned_LeavesOutboundWriterUsable() + { + var inbound = Channel.CreateUnbounded(); + var outbound = Channel.CreateUnbounded(); + using var endpoint = new ChannelEndpoint( + inbound.Reader, + outbound.Writer, + new ChannelEndpointOptions { CompleteOutboundOnTerminal = false }); + var expected = new InvalidOperationException("inbound failed"); + var payload = await WaitForClosedAsync(endpoint, () => inbound.Writer.TryComplete(expected)); + + Assert.Same(expected, payload.Error); + Assert.True(outbound.Writer.TryWrite( + new ChannelMessage(new EventEnvelope("channel:external-owner-fault", new TestPayload("still-open"))))); + } + [Fact] public async Task InboundNullEnvelope_FaultsEndpointAndEmitsClosedEvent() { @@ -232,7 +470,10 @@ public async Task InboundNullEnvelope_FaultsEndpointAndEmitsClosedEvent() using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer); var payload = await WaitForClosedAsync( endpoint, - async () => await inbound.Writer.WriteAsync(new ChannelMessage(null), TestContext.Current.CancellationToken)); + async () => + // Intentionally violate the public non-null contract to verify that malformed + // transport input still faults the endpoint deterministically. + await inbound.Writer.WriteAsync(new ChannelMessage(null!), TestContext.Current.CancellationToken)); Assert.IsType(payload.Error); } @@ -279,6 +520,47 @@ public async Task InboundListenerException_FaultsPendingInvokeOnPairedEndpointWi Assert.Same(expected, actual); } + [Fact] + public async Task InboundListenerException_FaultsActiveStreamOnPairedEndpointWithOriginalCause() + { + using var pipe = new ChannelPipe(); + var streamDefinition = new InvokeEventDefinition("channel:stream-peer-fault"); + var faultingDefinition = new EventDefinition("channel:stream-peer-listener-fault"); + var expected = new InvalidOperationException("listener failed"); + + using var _ = pipe.Right.RegisterStreamHandler(streamDefinition, PendingAsync); + await using var enumerator = pipe.Left + .CreateInvokeStreamClient(streamDefinition) + .InvokeAsync(1, CancellationToken.None) + .GetAsyncEnumerator(TestContext.Current.CancellationToken); + var pendingMoveNext = enumerator.MoveNextAsync().AsTask(); + + using var __ = pipe.Left.Subscribe(faultingDefinition, _ => throw expected); + + pipe.Right.Emit(faultingDefinition, new TestPayload("boom")); + + var actual = await Assert.ThrowsAsync( + async () => await pendingMoveNext.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken)); + Assert.Same(expected, actual); + } + + [Fact] + public async Task Dispose_WhenClosedEventListenerThrows_StillFaultsPendingInvoke() + { + using var pipe = new ChannelPipe(); + var definition = new InvokeEventDefinition("channel:dispose-closed-listener-fault"); + var client = pipe.Left.CreateInvokeClient(definition); + var pending = client.InvokeAsync("request", CancellationToken.None); + + using var _ = pipe.Left.Subscribe(ChannelEvents.Closed, _ => throw new InvalidOperationException("closed listener failed")); + + pipe.Left.Dispose(); + + var error = await Assert.ThrowsAsync( + async () => await pending.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken)); + Assert.Equal("Channel endpoint disposed.", error.Message); + } + [Fact] public async Task Emit_AfterInboundListenerException_PreservesOriginalTerminalCause() { @@ -334,6 +616,14 @@ private static async IAsyncEnumerable CountAsync( } } + private static async IAsyncEnumerable PendingAsync( + int _, + [EnumeratorCancellation] CancellationToken cancellationToken) + { + await Task.Delay(Timeout.InfiniteTimeSpan, cancellationToken); + yield break; + } + private static async IAsyncEnumerable Numbers(params int[] values) { foreach (var value in values) @@ -411,6 +701,30 @@ private static async Task AssertEventuallyDisposedAsync(CancellationTokenSource Assert.Throws(() => _ = source.Token); } + private static void AssertDisposed(Action action) + { + var error = Assert.Throws(action); + Assert.Equal("Channel endpoint disposed.", error.Message); + } + + private static ClosedObserver CreateSingleClosedObserver(IEventContext context, string side) + { + var observer = new ClosedObserver( + new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); + observer.Subscription = context.Subscribe(ChannelEvents.Closed, _ => + { + if (Interlocked.Increment(ref observer.Count) == 1) + { + observer.Observed.TrySetResult(true); + return; + } + + observer.Observed.TrySetException(new InvalidOperationException($"{side} closed event observed more than once.")); + }); + + return observer; + } + private static async Task> CollectAsync(IAsyncEnumerable source) { var results = new List(); @@ -423,6 +737,116 @@ private static async Task> CollectAsync(IAsyncEnumerable source) return results; } + private sealed class ClosedObserver(TaskCompletionSource observed) + { + public IDisposable Subscription { get; set; } = null!; + + public TaskCompletionSource Observed { get; } = observed; + + public int Count; + } + + private sealed class ProbeRejectingWriter : ChannelWriter + { + public int WriteAsyncCalls; + + public override bool TryComplete(Exception? error = null) + { + return true; + } + + public override bool TryWrite(ChannelMessage item) + { + return false; + } + + public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default) + { + return ValueTask.FromResult(true); + } + + public override ValueTask WriteAsync(ChannelMessage item, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref WriteAsyncCalls); + return ValueTask.FromException(new InvalidOperationException("WriteAsync should not be called.")); + } + } + + private sealed class ProbePendingWaitWriter : ChannelWriter, IValueTaskSource + { + private ManualResetValueTaskSourceCore _wait = new() + { + RunContinuationsAsynchronously = true + }; + + private CancellationTokenRegistration _cancellationRegistration; + private readonly TaskCompletionSource _observed = new(TaskCreationOptions.RunContinuationsAsynchronously); + + public int WaitToWriteCalls; + public int CanceledWaits; + public int GetResultCalls; + public int WriteAsyncCalls; + + public Task ObservedTask => _observed.Task; + + public override bool TryComplete(Exception? error = null) + { + return true; + } + + public override bool TryWrite(ChannelMessage item) + { + return false; + } + + public override ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref WaitToWriteCalls); + + _cancellationRegistration = cancellationToken.Register(() => + { + Interlocked.Increment(ref CanceledWaits); + _wait.SetException(new OperationCanceledException(cancellationToken)); + }); + + return new ValueTask(this, _wait.Version); + } + + public override ValueTask WriteAsync(ChannelMessage item, CancellationToken cancellationToken = default) + { + Interlocked.Increment(ref WriteAsyncCalls); + return ValueTask.FromException(new InvalidOperationException("WriteAsync should not be called.")); + } + + bool IValueTaskSource.GetResult(short token) + { + try + { + return _wait.GetResult(token); + } + finally + { + _cancellationRegistration.Dispose(); + Interlocked.Increment(ref GetResultCalls); + _observed.TrySetResult(true); + } + } + + ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) + { + return _wait.GetStatus(token); + } + + void IValueTaskSource.OnCompleted( + Action continuation, + object? state, + short token, + ValueTaskSourceOnCompletedFlags flags) + { + _wait.OnCompleted(continuation, state, token, flags); + } + } + private sealed record TestPayload(string Value); private sealed record EchoRequest(string Value); private sealed record EchoResponse(string Value);