Skip to content

Commit 0353f1f

Browse files
authored
Command Processor Dispatching Strategy (BrighterCommand#3652)
* feat: add an ADR for routing in command dispatcher * feat: add a type based router * fix: move to subscriberregistry based approach * fix: move the subscriber registry to using an expression to determine the handler for a type * fix: (Failing Tests) we need to add the request to the call to Build and BuildAsync in the PipelineBuilder and pass that to Get on the SubscriberRegistry, so that we make the request available when choosing the type. * chore: additional adr notes * fix: pass an IRequest into calls to Build on the pipeline * chore: fix errors when enabling nullable in DependencyInjection * feat: add async registration of an agreement dispatcher * fix: change test name to reflect what it asserts * feat: add further tests for agreement dispatcher * fix: add send tests, alter MyCommandHandlerAsync to have request monitoring functionality of sync counterpart * chore: update ADR for design changes * chore: add async publish test * feat: add basic agreement tests * feat: add agreement tests for multiple * feat: allow an excepton list for auto-registration so that dynamic handlers do not get auto-registered as a simple lookup * fix: issues arising from changes to test doubles and nullable * fix: minor fixes for PR feedback * message pump adr, seems to be on this branch?
1 parent 33858cb commit 0353f1f

102 files changed

Lines changed: 1284 additions & 326 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# 28. Support Agreement Dispatcher
2+
3+
Date: 2025-07-07
4+
5+
## Status
6+
7+
Draft
8+
9+
## Context
10+
11+
Brighter has support for dispatching message to a handler based on the message type. This is a common default, where a particular command or event has just one handler and we want to dispatch it to that handler. The main value of the dispatcher in this case, comes from the middleware which acts as a command processor. However another goal of a dispatcher is to allow us to choose the hander that we route to. This is not possible with the current dispatcher, as it only supports a single handler for a given message type.
12+
13+
The current dispatcher does not support the case where we want to route a message to a handler based on some other criteria, such as the content of the request or some external state in the context. This is a common requirement in many systems, and it is not currently supported by Brighter.
14+
15+
One scenario that is typical here is Martin Fowler's [Agreement Dispatcher](https://martinfowler.com/eaaDev/AgreementDispatcher.html) which is a pattern for routing messages to different handlers depending on time - as new rules come into force, we want to route to different destinations.
16+
17+
## Decision
18+
19+
We will change SubscriberRegistry to support different routing strategies. We will change the implementation of the `SubscriberRegistry` to take a lambda function that takes a `IRequest` and `IRequestContext` and returns a `List<Type>` of matching handlers. This allows users to register a method to determine the type to return. We will provide support for `Type` based routing by simply returning the `List<Type>` that you register as the return value from this method, and allow you to register others.
20+
21+
Because SubscriberRegistry supports many types, we can't use a generic parameter for this Lambda. This creates a limitation of this approach, in that the end user will have to cast the `IRequest` into an appropriate type to access it's state, to make routing decisions. This is a reasonable trade-off, but one worth documenting in our writeup, in case it is not obvious to users how to access the state of their command or event.
22+
23+
Then we need to allow you to register a lambda explicitly, over using the default—just return this type approach—so that you can utilize the request and context to determine what handler(s) to return. As you can return one or more, we usually append new registrations for the same type.
24+
25+
Because implementations of the `SubscriberRegistry` for `SeviceCollection` are needed to support `IAmAHandlerFactory` impelementations derived from `ServiceCollection`, the `SubscriberRegistry` method `Register` will take as a parameter the possible `IAmARequestHandler` handler types, which can be used to add them to the `ServiceCollection`, along with their lifetime.
26+
27+
When we do auto-registration, we are likely to register handlers that you want to provide an explicit factory for instead. Because we append new handlers to the chain of possible handlers, for publishing, you would never override the simple route type to handler in this case. So for auto-registration we will need to support an exclusion list: don't auto-register these handlers, because you want to explicitly register how the handler is determined.
28+
29+
## Consequences
30+
31+
Providing explicit routing makes another benefit of the Dispatcher model clear, beyond middleware, which is the ability to route to a handler based on content. As we build it into the dispatcher, this will also be available for messaging scenarios, and thus external events might have different routing strategies based on their content and the message itself (which is available in the context).
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# 29. Use CloudEventsType for Message Pump
2+
3+
Date: 2025-07-14
4+
5+
## Status
6+
7+
Accepted
8+
9+
## Context
10+
11+
We assume that the message pump listens to a [DataType Channel](https://www.enterpriseintegrationpatterns.com/patterns/messaging/DatatypeChannel.html), which is a channel that can receive messages of a specific type. The message pump will then process these messages and send them to the appropriate destination.
12+
13+
What happens then if a producer decides to send multiple types of messages to the same channel? In this case, we need a way to differentiate between the different types of messages. We can use the [CloudEvents](https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md) `type` field for deciding what the actual type of the message is. This allows us to use the same channel for multiple types of messages, while still being able to differentiate between them. From the documentation: "This attribute contains a value describing the type of event related to the originating occurrence. Often this attribute is used for routing, observability, policy enforcement, etc. SHOULD be prefixed with a reverse-DNS name. The prefixed domain dictates the organization which defines the semantics of this event type."
14+
15+
In this case then the metadata of the message decides how it is routed. Our exissting message pump implementation does not support this as we configure the type of the channel via a generic parameter, yet in this case we do not know the type of the message in advance. We need to change the implementation of the message pump to support this.
16+
17+
## Decision
18+
19+
20+
## Consequences

samples/Scheduler/AwsTaskQueue/GreetingsPumper/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ private static async Task Main(string[] args)
8080
SchedulerTopicOrQueue = new RoutingKey("message-scheduler-topic"),
8181
OnConflict = OnSchedulerConflict.Overwrite
8282
})
83-
.AutoFromAssemblies(typeof(GreetingEvent).Assembly);
83+
.AutoFromAssemblies([typeof(GreetingEvent).Assembly]);
8484
}
8585

8686
services.AddHostedService<RunCommandProcessor>();

samples/Scheduler/QuartzTaskQueue/GreetingsPumper/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private static async Task Main(string[] args)
8484
return new QuartzSchedulerFactory(
8585
factory.GetScheduler().GetAwaiter().GetResult());
8686
})
87-
.AutoFromAssemblies(typeof(GreetingEvent).Assembly);
87+
.AutoFromAssemblies([typeof(GreetingEvent).Assembly]);
8888
}
8989

9090
services.AddHostedService<RunCommandProcessor>();

samples/TaskQueue/AWSTaskQueue/GreetingsPumper/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private static async Task Main(string[] args)
6262
{
6363
configure.ProducerRegistry = producerRegistry;
6464
})
65-
.AutoFromAssemblies(typeof(GreetingEvent).Assembly);
65+
.AutoFromAssemblies([typeof(GreetingEvent).Assembly]);
6666
}
6767

6868
services.AddHostedService<RunCommandProcessor>();

samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ static async Task Main(string[] args)
8686
{
8787
configure.ProducerRegistry = producerRegistry;
8888
})
89-
.AutoFromAssemblies(typeof(GreetingEvent).Assembly);
89+
.AutoFromAssemblies([typeof(GreetingEvent).Assembly]);
9090

9191
var serviceProvider = serviceCollection.BuildServiceProvider();
9292

samples/TaskQueue/KafkaSchemaRegistry/GreetingsSender/Program.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ THE SOFTWARE. */
8484
{
8585
configure.ProducerRegistry = producerRegistry;
8686
})
87-
.MapperRegistryFromAssemblies(typeof(GreetingEvent).Assembly);
87+
.MapperRegistryFromAssemblies([typeof(GreetingEvent).Assembly]);
8888

8989
services.AddHostedService<TimedMessageGenerator>();
9090
})
@@ -94,7 +94,7 @@ THE SOFTWARE. */
9494
await host.RunAsync();
9595
return;
9696

97-
static PolicyRegistry RegisterPolicies()
97+
static PolicyRegistry? RegisterPolicies()
9898
{
9999
var retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new[]
100100
{

samples/TaskQueue/KafkaTaskQueue/GreetingsSender/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ THE SOFTWARE. */
102102
{
103103
configure.ProducerRegistry = producerRegistry;
104104
})
105-
.MapperRegistryFromAssemblies(typeof(GreetingEvent).Assembly);
105+
.MapperRegistryFromAssemblies([typeof(GreetingEvent).Assembly]);
106106

107107
services.AddHostedService<TimedMessageGenerator>();
108108
})

samples/TaskQueue/MultiBus/GreetingsSender/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ THE SOFTWARE. */
121121
{
122122
configure.ProducerRegistry = new CombinedProducerRegistryFactory(rmqMessageProducerFactory, kafkaMessageProducerFactory).Create();
123123
})
124-
.MapperRegistryFromAssemblies(typeof(GreetingEvent).Assembly);
124+
.MapperRegistryFromAssemblies([typeof(GreetingEvent).Assembly]);
125125

126126
services.AddHostedService<TimedMessageGenerator>();
127127
})

samples/Transforms/AWSTransfomers/ClaimCheck/GreetingsSender/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ static void Main(string[] args)
8787
HttpClientFactory = provider.GetService<IHttpClientFactory>(),
8888
Strategy = StorageStrategy.Validate
8989
}))
90-
.AutoFromAssemblies(typeof(GreetingEvent).Assembly);
90+
.AutoFromAssemblies([typeof(GreetingEvent).Assembly]);
9191

9292
//We need this for the check as to whether an S3 bucket exists
9393
serviceCollection.AddHttpClient();

0 commit comments

Comments
 (0)