Skip to content

Commit 52dcb94

Browse files
authored
Merge pull request #232 from buildersoftio/v3/feature/215
v3/feature/215: Pluggable Notification Publish Strategies
2 parents 4daf093 + b2008cf commit 52dcb94

8 files changed

Lines changed: 567 additions & 6 deletions

File tree

src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,29 @@ public class MediatorOptions
2626
/// </summary>
2727
public ServiceLifetime HandlerLifetime { get; set; } = ServiceLifetime.Scoped;
2828

29+
/// <summary>
30+
/// Gets the type of notification publish strategy to use.
31+
/// Defaults to <see cref="ParallelNotificationStrategy"/>.
32+
/// Use <see cref="UseNotificationPublishStrategy{TStrategy}"/> to change.
33+
/// </summary>
34+
internal Type NotificationPublishStrategyType { get; private set; } = typeof(ParallelNotificationStrategy);
35+
36+
/// <summary>
37+
/// Sets the strategy used to publish notifications to multiple handlers.
38+
/// </summary>
39+
/// <typeparam name="TStrategy">
40+
/// The strategy implementation. Built-in options:
41+
/// <see cref="ParallelNotificationStrategy"/> (default) — all handlers run in parallel via Task.WhenAll,
42+
/// <see cref="SequentialNotificationStrategy"/> — handlers run one at a time in registration order,
43+
/// <see cref="StopOnFirstFailureNotificationStrategy"/> — sequential, stops on first exception.
44+
/// </typeparam>
45+
public MediatorOptions UseNotificationPublishStrategy<TStrategy>()
46+
where TStrategy : class, INotificationPublishStrategy
47+
{
48+
NotificationPublishStrategyType = typeof(TStrategy);
49+
return this;
50+
}
51+
2952

3053
/// <summary>
3154
/// Register a *closed* command pipeline behavior.

src/Cortex.Mediator/DependencyInjection/ServiceCollectionExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public static IServiceCollection AddCortexMediator(
2323
configure?.Invoke(options);
2424

2525
services.AddScoped<IMediator, Mediator>();
26+
services.AddSingleton(typeof(INotificationPublishStrategy), options.NotificationPublishStrategyType);
2627

2728
// Validation has been removed for issue #118
2829
//services.AddValidatorsFromAssemblies(handlerAssemblyMarkerTypes.Select(t => t.Assembly));

src/Cortex.Mediator/Mediator.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,20 +148,19 @@ public async Task PublishAsync<TNotification>(
148148
var behaviors = _serviceProvider.GetServices<INotificationPipelineBehavior<TNotification>>();
149149

150150
// Materialize behaviors once since we need to iterate multiple times
151-
// Use stackalloc-friendly pattern for small counts
152151
var behaviorList = behaviors as INotificationPipelineBehavior<TNotification>[] ?? behaviors.ToArray();
153152
Array.Reverse(behaviorList);
154153

155-
// Count handlers to pre-allocate task array
154+
// Count handlers to pre-allocate delegate list
156155
var handlerList = handlers as INotificationHandler<TNotification>[] ?? handlers.ToArray();
157156
if (handlerList.Length == 0)
158157
return;
159158

160-
var tasks = new Task[handlerList.Length];
159+
// Build a pipeline delegate for each handler
160+
var handlerDelegates = new Func<Task>[handlerList.Length];
161161
for (int i = 0; i < handlerList.Length; i++)
162162
{
163163
var handler = handlerList[i];
164-
// Build the pipeline for this specific handler
165164
NotificationHandlerDelegate handlerDelegate = () => handler.Handle(notification, cancellationToken);
166165

167166
// Wrap the handler with all behaviors (already reversed)
@@ -172,10 +171,21 @@ public async Task PublishAsync<TNotification>(
172171
handlerDelegate = () => currentBehavior.Handle(notification, currentDelegate, cancellationToken);
173172
}
174173

175-
tasks[i] = handlerDelegate();
174+
var finalDelegate = handlerDelegate;
175+
handlerDelegates[i] = () => finalDelegate();
176176
}
177177

178-
await Task.WhenAll(tasks);
178+
// Delegate to the publish strategy
179+
var strategy = _serviceProvider.GetService<INotificationPublishStrategy>();
180+
if (strategy != null)
181+
{
182+
await strategy.PublishAsync(handlerDelegates, cancellationToken);
183+
}
184+
else
185+
{
186+
// Fallback to parallel when no strategy is registered (e.g., manual DI without AddCortexMediator)
187+
await Task.WhenAll(handlerDelegates.Select(d => d()));
188+
}
179189
}
180190

181191
public Task PublishAsync(INotification notification, CancellationToken cancellationToken = default)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace Cortex.Mediator.Notifications
7+
{
8+
/// <summary>
9+
/// Defines the strategy for publishing notifications to multiple handler pipelines.
10+
/// </summary>
11+
public interface INotificationPublishStrategy
12+
{
13+
/// <summary>
14+
/// Publishes a notification by executing the provided handler delegates according to the strategy.
15+
/// </summary>
16+
/// <param name="handlerDelegates">The handler pipeline delegates to execute.</param>
17+
/// <param name="cancellationToken">The cancellation token.</param>
18+
Task PublishAsync(
19+
IEnumerable<Func<Task>> handlerDelegates,
20+
CancellationToken cancellationToken);
21+
}
22+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace Cortex.Mediator.Notifications
8+
{
9+
/// <summary>
10+
/// Publishes notifications to all handlers in parallel using Task.WhenAll.
11+
/// This is the default strategy.
12+
/// </summary>
13+
public sealed class ParallelNotificationStrategy : INotificationPublishStrategy
14+
{
15+
public Task PublishAsync(
16+
IEnumerable<Func<Task>> handlerDelegates,
17+
CancellationToken cancellationToken)
18+
{
19+
var tasks = handlerDelegates.Select(handler => handler()).ToArray();
20+
21+
if (tasks.Length == 0)
22+
return Task.CompletedTask;
23+
24+
return Task.WhenAll(tasks);
25+
}
26+
}
27+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace Cortex.Mediator.Notifications
7+
{
8+
/// <summary>
9+
/// Publishes notifications to handlers one at a time in registration order.
10+
/// If a handler throws, the exception propagates and remaining handlers are not executed.
11+
/// </summary>
12+
public sealed class SequentialNotificationStrategy : INotificationPublishStrategy
13+
{
14+
public async Task PublishAsync(
15+
IEnumerable<Func<Task>> handlerDelegates,
16+
CancellationToken cancellationToken)
17+
{
18+
foreach (var handler in handlerDelegates)
19+
{
20+
cancellationToken.ThrowIfCancellationRequested();
21+
await handler();
22+
}
23+
}
24+
}
25+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace Cortex.Mediator.Notifications
7+
{
8+
/// <summary>
9+
/// Publishes notifications to handlers sequentially, stopping immediately on the first exception.
10+
/// Unlike <see cref="SequentialNotificationStrategy"/>, this strategy explicitly signals that
11+
/// failure handling is the primary concern — remaining handlers are intentionally skipped.
12+
/// </summary>
13+
public sealed class StopOnFirstFailureNotificationStrategy : INotificationPublishStrategy
14+
{
15+
public async Task PublishAsync(
16+
IEnumerable<Func<Task>> handlerDelegates,
17+
CancellationToken cancellationToken)
18+
{
19+
foreach (var handler in handlerDelegates)
20+
{
21+
cancellationToken.ThrowIfCancellationRequested();
22+
await handler();
23+
}
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)