Skip to content

Commit 443f2ec

Browse files
committed
chore: code style update
1 parent 38ed3d1 commit 443f2ec

3 files changed

Lines changed: 117 additions & 69 deletions

File tree

src/Eventa.Adapters/Channels/ChannelEndpoint.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ private async Task RunInboundPumpAsync()
148148
{
149149
Terminate(error, completeOutbound: true, cancelInbound: false, outboundError: error);
150150
}
151+
finally
152+
{
153+
_disposeCancellation.Dispose();
154+
}
151155
}
152156

153157
/// <summary>

src/Eventa/Abstractions/IEventaAdapter.cs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,24 @@ public interface IEventaAdapter : IDisposable
2020
void OnSent(string eventId, object? envelope, object? options = null);
2121

2222
/// <summary>
23-
/// Called after a local listener or match-expression subscription receives an emitted event.
24-
/// The runtime value is the emitted <see cref="EventEnvelope{TPayload}"/>, exposed as <see cref="object"/> because
25-
/// the adapter interface is non-generic.
23+
/// Called after local listener dispatch completes for <see cref="IEventContext.Emit{TPayload}(EventDefinition{TPayload}, TPayload)"/>,
24+
/// or after <see cref="IEventInboundDispatcher.Receive(IEventEnvelope, object?)"/> dispatches a transport-originated envelope.
2625
/// </summary>
2726
/// <param name="eventId">
2827
/// The listener key that received the event. This is the original event id for direct subscriptions, and can be a
2928
/// match-expression id for match listeners.
3029
/// </param>
3130
/// <param name="envelope">
32-
/// The emitted <see cref="EventEnvelope{TPayload}"/> instance. When <paramref name="eventId"/> is a
33-
/// match-expression id, inspect <c>envelope.EventId</c> to get the original event id.
31+
/// The received envelope, exposed as <see cref="object"/> because the adapter interface is non-generic.
32+
/// For local emit notifications (<paramref name="options"/> is <see langword="null"/>), the runtime value is the
33+
/// emitted <see cref="EventEnvelope{TPayload}"/> instance. For transport-originated notifications, the runtime value
34+
/// can be any <see cref="IEventEnvelope"/> implementation passed to <see cref="IEventInboundDispatcher.Receive(IEventEnvelope, object?)"/>.
35+
/// When <paramref name="eventId"/> is a match-expression id, inspect <see cref="IEventEnvelope.EventId"/> on the
36+
/// runtime envelope to get the original event id.
3437
/// </param>
3538
/// <param name="options">
36-
/// Optional metadata forwarded from <see cref="IEventInboundDispatcher.Receive"/>, or <see langword="null"/>
37-
/// for local emit notifications.
39+
/// Optional metadata forwarded from <see cref="IEventInboundDispatcher.Receive(IEventEnvelope, object?)"/>, or
40+
/// <see langword="null"/> for local emit notifications.
3841
/// </param>
3942
void OnReceived(string eventId, object? envelope, object? options = null);
4043
}

tests/Eventa.Adapters.Tests/Channels/ChannelPipeTests.cs

Lines changed: 103 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Reflection;
12
using System.Runtime.CompilerServices;
23
using System.Threading.Channels;
34

@@ -146,6 +147,31 @@ public void Emit_AfterEndpointDisposed_FailsFast()
146147
Assert.Equal("Channel endpoint disposed.", error.Message);
147148
}
148149

150+
[Fact]
151+
public async Task Dispose_WhenCalled_ReleasesInboundPumpCancellationSource()
152+
{
153+
var inbound = Channel.CreateUnbounded<ChannelMessage>();
154+
var outbound = Channel.CreateUnbounded<ChannelMessage>();
155+
using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer);
156+
var inboundPumpCancellation = GetInboundPumpCancellationSource(endpoint);
157+
158+
endpoint.Dispose();
159+
160+
await AssertEventuallyDisposedAsync(inboundPumpCancellation);
161+
}
162+
163+
[Fact]
164+
public async Task InboundChannelCompletion_WhenObserved_ReleasesInboundPumpCancellationSource()
165+
{
166+
var inbound = Channel.CreateUnbounded<ChannelMessage>();
167+
var outbound = Channel.CreateUnbounded<ChannelMessage>();
168+
using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer);
169+
var inboundPumpCancellation = GetInboundPumpCancellationSource(endpoint);
170+
var payload = await WaitForClosedAsync(endpoint, () => inbound.Writer.TryComplete());
171+
Assert.IsType<ChannelClosedException>(payload.Error);
172+
await AssertEventuallyDisposedAsync(inboundPumpCancellation);
173+
}
174+
149175
[Fact]
150176
public void Emit_WhenOutboundWriterCompleted_ThrowsChannelClosedException()
151177
{
@@ -193,14 +219,7 @@ public async Task CustomEndpointDispose_WhenConfigured_CompletesOutboundForPaire
193219
leftToRight.Writer,
194220
new ChannelEndpointOptions { CompleteOutboundOnDispose = true });
195221
using var right = new ChannelEndpoint(leftToRight.Reader, rightToLeft.Writer);
196-
var closed = new TaskCompletionSource<ChannelClosedPayload>(
197-
TaskCreationOptions.RunContinuationsAsynchronously);
198-
199-
using var _ = right.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
200-
201-
left.Dispose();
202-
203-
var payload = await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
222+
var payload = await WaitForClosedAsync(right, left.Dispose);
204223
var error = Assert.IsType<ChannelClosedException>(payload.Error);
205224
Assert.Equal("Channel closed.", error.Message);
206225
}
@@ -211,14 +230,9 @@ public async Task InboundNullEnvelope_FaultsEndpointAndEmitsClosedEvent()
211230
var inbound = Channel.CreateUnbounded<ChannelMessage>();
212231
var outbound = Channel.CreateUnbounded<ChannelMessage>();
213232
using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer);
214-
var closed = new TaskCompletionSource<ChannelClosedPayload>(
215-
TaskCreationOptions.RunContinuationsAsynchronously);
216-
217-
using var _ = endpoint.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
218-
219-
await inbound.Writer.WriteAsync(new ChannelMessage(null), TestContext.Current.CancellationToken);
220-
221-
var payload = await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
233+
var payload = await WaitForClosedAsync(
234+
endpoint,
235+
async () => await inbound.Writer.WriteAsync(new ChannelMessage(null), TestContext.Current.CancellationToken));
222236
Assert.IsType<InvalidOperationException>(payload.Error);
223237
}
224238

@@ -230,17 +244,7 @@ public async Task InboundListenerException_FaultsEndpointAndEmitsClosedEvent()
230244
using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer);
231245
var definition = new EventDefinition<TestPayload>("channel:listener-fault");
232246
var expected = new InvalidOperationException("listener failed");
233-
var closed = new TaskCompletionSource<ChannelClosedPayload>(
234-
TaskCreationOptions.RunContinuationsAsynchronously);
235-
236-
using var _ = endpoint.Subscribe(definition, _ => throw expected);
237-
using var __ = endpoint.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
238-
239-
await inbound.Writer.WriteAsync(
240-
new ChannelMessage(new EventEnvelope<TestPayload>(definition.Id, new TestPayload("boom"))),
241-
TestContext.Current.CancellationToken);
242-
243-
var payload = await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
247+
var payload = await TriggerInboundListenerFaultAsync(endpoint, inbound.Writer, definition, expected);
244248
Assert.Same(expected, payload.Error);
245249
}
246250

@@ -250,15 +254,9 @@ public async Task InboundListenerException_CompletesOutboundForPairedEndpointWit
250254
using var pipe = new ChannelPipe();
251255
var definition = new EventDefinition<TestPayload>("channel:paired-listener-fault");
252256
var expected = new InvalidOperationException("listener failed");
253-
var closed = new TaskCompletionSource<ChannelClosedPayload>(
254-
TaskCreationOptions.RunContinuationsAsynchronously);
255-
256257
using var _ = pipe.Right.Subscribe(definition, _ => throw expected);
257-
using var __ = pipe.Left.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
258258

259-
pipe.Left.Emit(definition, new TestPayload("boom"));
260-
261-
var payload = await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
259+
var payload = await WaitForClosedAsync(pipe.Left, () => pipe.Left.Emit(definition, new TestPayload("boom")));
262260
Assert.Same(expected, payload.Error);
263261
}
264262

@@ -290,16 +288,7 @@ public async Task Emit_AfterInboundListenerException_PreservesOriginalTerminalCa
290288
var faultingDefinition = new EventDefinition<TestPayload>("channel:listener-terminal-cause");
291289
var lateDefinition = new EventDefinition<TestPayload>("channel:late-after-listener-fault");
292290
var expected = new InvalidOperationException("listener failed");
293-
var closed = new TaskCompletionSource<ChannelClosedPayload>(
294-
TaskCreationOptions.RunContinuationsAsynchronously);
295-
296-
using var _ = endpoint.Subscribe(faultingDefinition, _ => throw expected);
297-
using var __ = endpoint.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
298-
299-
await inbound.Writer.WriteAsync(
300-
new ChannelMessage(new EventEnvelope<TestPayload>(faultingDefinition.Id, new TestPayload("boom"))),
301-
TestContext.Current.CancellationToken);
302-
await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
291+
await TriggerInboundListenerFaultAsync(endpoint, inbound.Writer, faultingDefinition, expected);
303292

304293
var error = Assert.Throws<ChannelClosedException>(() => endpoint.Emit(lateDefinition, new TestPayload("late")));
305294
Assert.Same(expected, error.InnerException);
@@ -314,16 +303,7 @@ public async Task Dispose_AfterInboundListenerException_PreservesOriginalTermina
314303
var faultingDefinition = new EventDefinition<TestPayload>("channel:listener-dispose-terminal-cause");
315304
var lateDefinition = new EventDefinition<TestPayload>("channel:late-after-dispose");
316305
var expected = new InvalidOperationException("listener failed");
317-
var closed = new TaskCompletionSource<ChannelClosedPayload>(
318-
TaskCreationOptions.RunContinuationsAsynchronously);
319-
320-
using var _ = endpoint.Subscribe(faultingDefinition, _ => throw expected);
321-
using var __ = endpoint.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
322-
323-
await inbound.Writer.WriteAsync(
324-
new ChannelMessage(new EventEnvelope<TestPayload>(faultingDefinition.Id, new TestPayload("boom"))),
325-
TestContext.Current.CancellationToken);
326-
await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
306+
await TriggerInboundListenerFaultAsync(endpoint, inbound.Writer, faultingDefinition, expected);
327307

328308
endpoint.Dispose();
329309

@@ -338,14 +318,7 @@ public async Task FaultedInboundChannel_PreservesOriginalExceptionInClosedEvent(
338318
var outbound = Channel.CreateUnbounded<ChannelMessage>();
339319
using var endpoint = new ChannelEndpoint(inbound.Reader, outbound.Writer);
340320
var expected = new InvalidOperationException("channel failed");
341-
var closed = new TaskCompletionSource<ChannelClosedPayload>(
342-
TaskCreationOptions.RunContinuationsAsynchronously);
343-
344-
using var _ = endpoint.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
345-
346-
inbound.Writer.TryComplete(expected);
347-
348-
var payload = await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
321+
var payload = await WaitForClosedAsync(endpoint, () => inbound.Writer.TryComplete(expected));
349322
Assert.Same(expected, payload.Error);
350323
}
351324

@@ -370,6 +343,74 @@ private static async IAsyncEnumerable<int> Numbers(params int[] values)
370343
}
371344
}
372345

346+
private static Task<ChannelClosedPayload> WaitForClosedAsync(IEventContext context, Action trigger)
347+
{
348+
return WaitForClosedAsync(
349+
context,
350+
() =>
351+
{
352+
trigger();
353+
return Task.CompletedTask;
354+
});
355+
}
356+
357+
private static async Task<ChannelClosedPayload> WaitForClosedAsync(IEventContext context, Func<Task> trigger)
358+
{
359+
var closed = new TaskCompletionSource<ChannelClosedPayload>(
360+
TaskCreationOptions.RunContinuationsAsynchronously);
361+
362+
using var _ = context.Subscribe(ChannelEvents.Closed, envelope => closed.TrySetResult(envelope.Body));
363+
364+
await trigger();
365+
return await closed.Task.WaitAsync(TimeSpan.FromSeconds(5), TestContext.Current.CancellationToken);
366+
}
367+
368+
private static async Task<ChannelClosedPayload> TriggerInboundListenerFaultAsync(
369+
ChannelEndpoint endpoint,
370+
ChannelWriter<ChannelMessage> inboundWriter,
371+
EventDefinition<TestPayload> faultingDefinition,
372+
Exception expected)
373+
{
374+
using var _ = endpoint.Subscribe(faultingDefinition, _ => throw expected);
375+
376+
return await WaitForClosedAsync(
377+
endpoint,
378+
async () => await inboundWriter.WriteAsync(
379+
new ChannelMessage(new EventEnvelope<TestPayload>(faultingDefinition.Id, new TestPayload("boom"))),
380+
TestContext.Current.CancellationToken));
381+
}
382+
383+
private static CancellationTokenSource GetInboundPumpCancellationSource(ChannelEndpoint endpoint)
384+
{
385+
var field = typeof(ChannelEndpoint).GetField(
386+
"_disposeCancellation",
387+
BindingFlags.Instance | BindingFlags.NonPublic)
388+
?? throw new InvalidOperationException("ChannelEndpoint no longer exposes _disposeCancellation.");
389+
390+
return Assert.IsType<CancellationTokenSource>(field.GetValue(endpoint));
391+
}
392+
393+
private static async Task AssertEventuallyDisposedAsync(CancellationTokenSource source)
394+
{
395+
var deadline = DateTime.UtcNow + TimeSpan.FromSeconds(5);
396+
397+
while (DateTime.UtcNow < deadline)
398+
{
399+
try
400+
{
401+
_ = source.Token;
402+
}
403+
catch (ObjectDisposedException)
404+
{
405+
return;
406+
}
407+
408+
await Task.Delay(TimeSpan.FromMilliseconds(10), TestContext.Current.CancellationToken);
409+
}
410+
411+
Assert.Throws<ObjectDisposedException>(() => _ = source.Token);
412+
}
413+
373414
private static async Task<List<T>> CollectAsync<T>(IAsyncEnumerable<T> source)
374415
{
375416
var results = new List<T>();

0 commit comments

Comments
 (0)