Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/Cortex.Mediator/DependencyInjection/MediatorOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,29 @@ public class MediatorOptions
/// </summary>
public ServiceLifetime HandlerLifetime { get; set; } = ServiceLifetime.Scoped;

/// <summary>
/// Gets the type of notification publish strategy to use.
/// Defaults to <see cref="ParallelNotificationStrategy"/>.
/// Use <see cref="UseNotificationPublishStrategy{TStrategy}"/> to change.
/// </summary>
internal Type NotificationPublishStrategyType { get; private set; } = typeof(ParallelNotificationStrategy);

/// <summary>
/// Sets the strategy used to publish notifications to multiple handlers.
/// </summary>
/// <typeparam name="TStrategy">
/// The strategy implementation. Built-in options:
/// <see cref="ParallelNotificationStrategy"/> (default) — all handlers run in parallel via Task.WhenAll,
/// <see cref="SequentialNotificationStrategy"/> — handlers run one at a time in registration order,
/// <see cref="StopOnFirstFailureNotificationStrategy"/> — sequential, stops on first exception.
/// </typeparam>
public MediatorOptions UseNotificationPublishStrategy<TStrategy>()
where TStrategy : class, INotificationPublishStrategy
{
NotificationPublishStrategyType = typeof(TStrategy);
return this;
}


/// <summary>
/// Register a *closed* command pipeline behavior.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public static IServiceCollection AddCortexMediator(
configure?.Invoke(options);

services.AddScoped<IMediator, Mediator>();
services.AddSingleton(typeof(INotificationPublishStrategy), options.NotificationPublishStrategyType);

// Validation has been removed for issue #118
//services.AddValidatorsFromAssemblies(handlerAssemblyMarkerTypes.Select(t => t.Assembly));
Expand Down
22 changes: 16 additions & 6 deletions src/Cortex.Mediator/Mediator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,19 @@ public async Task PublishAsync<TNotification>(
var behaviors = _serviceProvider.GetServices<INotificationPipelineBehavior<TNotification>>();

// Materialize behaviors once since we need to iterate multiple times
// Use stackalloc-friendly pattern for small counts
var behaviorList = behaviors as INotificationPipelineBehavior<TNotification>[] ?? behaviors.ToArray();
Array.Reverse(behaviorList);

// Count handlers to pre-allocate task array
// Count handlers to pre-allocate delegate list
var handlerList = handlers as INotificationHandler<TNotification>[] ?? handlers.ToArray();
if (handlerList.Length == 0)
return;

var tasks = new Task[handlerList.Length];
// Build a pipeline delegate for each handler
var handlerDelegates = new Func<Task>[handlerList.Length];
for (int i = 0; i < handlerList.Length; i++)
{
var handler = handlerList[i];
// Build the pipeline for this specific handler
NotificationHandlerDelegate handlerDelegate = () => handler.Handle(notification, cancellationToken);

// Wrap the handler with all behaviors (already reversed)
Expand All @@ -172,10 +171,21 @@ public async Task PublishAsync<TNotification>(
handlerDelegate = () => currentBehavior.Handle(notification, currentDelegate, cancellationToken);
}

tasks[i] = handlerDelegate();
var finalDelegate = handlerDelegate;
handlerDelegates[i] = () => finalDelegate();
}

await Task.WhenAll(tasks);
// Delegate to the publish strategy
var strategy = _serviceProvider.GetService<INotificationPublishStrategy>();
if (strategy != null)
{
await strategy.PublishAsync(handlerDelegates, cancellationToken);
}
else
{
// Fallback to parallel when no strategy is registered (e.g., manual DI without AddCortexMediator)
await Task.WhenAll(handlerDelegates.Select(d => d()));
}
}

public Task PublishAsync(INotification notification, CancellationToken cancellationToken = default)
Expand Down
22 changes: 22 additions & 0 deletions src/Cortex.Mediator/Notifications/INotificationPublishStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Cortex.Mediator.Notifications
{
/// <summary>
/// Defines the strategy for publishing notifications to multiple handler pipelines.
/// </summary>
public interface INotificationPublishStrategy
{
/// <summary>
/// Publishes a notification by executing the provided handler delegates according to the strategy.
/// </summary>
/// <param name="handlerDelegates">The handler pipeline delegates to execute.</param>
/// <param name="cancellationToken">The cancellation token.</param>
Task PublishAsync(
IEnumerable<Func<Task>> handlerDelegates,
CancellationToken cancellationToken);
}
}
27 changes: 27 additions & 0 deletions src/Cortex.Mediator/Notifications/ParallelNotificationStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Cortex.Mediator.Notifications
{
/// <summary>
/// Publishes notifications to all handlers in parallel using Task.WhenAll.
/// This is the default strategy.
/// </summary>
public sealed class ParallelNotificationStrategy : INotificationPublishStrategy
{
public Task PublishAsync(
IEnumerable<Func<Task>> handlerDelegates,
CancellationToken cancellationToken)
{
var tasks = handlerDelegates.Select(handler => handler()).ToArray();

if (tasks.Length == 0)
return Task.CompletedTask;

return Task.WhenAll(tasks);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Cortex.Mediator.Notifications
{
/// <summary>
/// Publishes notifications to handlers one at a time in registration order.
/// If a handler throws, the exception propagates and remaining handlers are not executed.
/// </summary>
public sealed class SequentialNotificationStrategy : INotificationPublishStrategy
{
public async Task PublishAsync(
IEnumerable<Func<Task>> handlerDelegates,
CancellationToken cancellationToken)
{
foreach (var handler in handlerDelegates)
{
cancellationToken.ThrowIfCancellationRequested();
await handler();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Cortex.Mediator.Notifications
{
/// <summary>
/// Publishes notifications to handlers sequentially, stopping immediately on the first exception.
/// Unlike <see cref="SequentialNotificationStrategy"/>, this strategy explicitly signals that
/// failure handling is the primary concern — remaining handlers are intentionally skipped.
/// </summary>
public sealed class StopOnFirstFailureNotificationStrategy : INotificationPublishStrategy
{
public async Task PublishAsync(
IEnumerable<Func<Task>> handlerDelegates,
CancellationToken cancellationToken)
{
foreach (var handler in handlerDelegates)
{
cancellationToken.ThrowIfCancellationRequested();
await handler();
}
}
}
}
Loading