Skip to content

Commit 7e16d06

Browse files
iancooperpreardon
andauthored
Add a Mediator to Brighter (BrighterCommand#3370)
* chore: save work in progress * feat: add ADR for a mediator and assembly * feat: required workflow classes to write setup of rist mediator test * chore: safety check whilst releasing V9 and V10; does not build * chore: safety checkin * feat: add fire and forget action * feat: add requestreply outline * fix: folder name causing issues on MacOS which believes it is an application * feat: modifications to step and workflow responsibility * feat: add workflow data, over just using the bag * feat: need correlation id on event and command to support workflow * chore: check in to allow merging of master * feat: move completed workflows to the done state * feat: add an ADR for adding the specification pattern * feat: add the specification pattern * fix: typo in filename * chore: safety dance * chore: safety dance * feat: add a choice workflow action * fix: shared fixture problems * feat: add first version of robust flow * fix: remove IAmTheWorkflowData as unnecessary abstraction. * fix: make choice about choosing the next step from the workflow data * fix: tests not checking all paths * fix: add workflow patterns to ADR * feat: move to workflow patterns style, step and task; some behaviours shift * feat: first pass at Parallel; requires Scheduler-Runner split to Mediator * chore: safety check-in during scheduler/runner split work * fix: refactor relationship between job and step to be more explicit. Make some fields private * fix: step advancement manages job state * fix: add cancellation token interrupt of runner to all tests * chore: safety check in; fixing failing tests * fix: get the steps to save state, when they modify the job, not the runner. Timing for in-process handlers requires this. * fix: add fault version of robust request-reply * fix: add multi-threading support to Job * Update ASB Samples to use the Emulator (BrighterCommand#3391) * feat: adding a Wait step. * fix: don't try to await a thread; ensure we leave time for scheduler to fire on a delay * chore: merge from master * chore: safety check in; failing test on parallel split still * fix: parallel split was not terminating * fix: we should pass data to the callbacks; was capturing variable in enclosing scope in tests * fix: issue with dequeuejobasync hard crashing * feat: move the implementation to FBP * fix: test issues, post merge * fix: use primary constructor * fix: need to initialize second step --------- Co-authored-by: Paul Reardon <Paul@ReardonTech.UK>
1 parent 268e05a commit 7e16d06

99 files changed

Lines changed: 3872 additions & 145 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Brighter.sln

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Salutation_Sweeper", "sampl
315315
EndProject
316316
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.MsSql", "src\Paramore.Brighter.Locking.MsSql\Paramore.Brighter.Locking.MsSql.csproj", "{758EE237-C722-4A0A-908C-2D08C1E59025}"
317317
EndProject
318+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Mediator", "src\Paramore.Brighter.Mediator\Paramore.Brighter.Mediator.csproj", "{F00B137A-C187-4C33-A37B-22AD40B71600}"
319+
EndProject
318320
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MessagingGateway.RMQ.Sync", "src\Paramore.Brighter.MessagingGateway.RMQ.Sync\Paramore.Brighter.MessagingGateway.RMQ.Sync.csproj", "{A040750D-3EFB-4580-BF29-1C46FE1B3E5B}"
319321
EndProject
320322
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.RMQ.Sync.Tests", "tests\Paramore.Brighter.RMQ.Sync.Tests\Paramore.Brighter.RMQ.Sync.Tests.csproj", "{283D7ACD-50D9-4B36-93E0-E6AF3732456C}"

Directory.Packages.props

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,14 @@
9999
<PackageVersion Include="Swashbuckle.AspNetCore" Version="8.1.4" />
100100
<PackageVersion Include="System.Data.SqlClient" Version="4.9.0" />
101101
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.5" />
102+
<PackageVersion Include="System.Linq.Async" Version="6.0.1" />
102103
<PackageVersion Include="System.Reflection.TypeExtensions" Version="4.7.0" />
103104
<PackageVersion Include="System.Net.Http" Version="4.3.4" />
104105
<PackageVersion Include="System.Net.Security" Version="4.3.2" />
105106
<PackageVersion Include="System.Security.Cryptography.X509Certificates" Version="4.3.2" />
106107
<PackageVersion Include="System.Text.Json" Version="9.0.5" />
107108
<PackageVersion Include="System.Text.RegularExpressions" Version="4.3.1" />
109+
<PackageVersion Include="System.Threading.Channels" Version="9.0.0" />
108110
<PackageVersion Include="xunit" Version="2.9.3" />
109111
<PackageVersion Include="xunit.runner.visualstudio" Version="3.1.1">
110112
<PrivateAssets>all</PrivateAssets>

docs/adr/0020-reduce-esb-complexity.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# 20. Reduce External Service Bus Complexity
22

3-
Date: 2019-08-01
3+
Date: 2024-08-01
44

55
## Status
66

docs/adr/0022-add-a-mediator.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# 22. Add a Mediator to Brighter
2+
3+
Date: 2024-10-22
4+
5+
## Status
6+
7+
Proposed
8+
9+
## Context
10+
We have two approaches to a workflow: orchestration and choreography. In choreography the workflow emerges from the interaction of the participants. In orchestration, one participant executes the workflow, calling other participants as needed. Whilst choreography has low-coupling, it also has low-cohesion. At scale this can lead to the Pinball anti-pattern, where it is difficult to maintain the workflow.
11+
12+
The [Mediator](https://www.oodesign.com/mediator-pattern) pattern provides an orchestrator that manages a workflow that involves multiple objects. In its simplest form, instead of talking to each other, objects talk to the mediator, which then calls other objects as required to execute the workflow.
13+
14+
Brighter provides `IHandleRequests<>` to provide a handler for an individual request, either a command or an event. It is possible to have an emergent workflow, within Brighter, through the choreography of these handlers. However, Brighter provides no model for an orchestrator that manages a workflow that involves multiple handlers. In particular, Brighter does not support a class that can listen to multiple requests and then call other handlers as required to execute the workflow.
15+
16+
In principle, nothing stops an end user from implementing a `Mediator` class that listens to multiple requests and then calls other handlers as required to execute the workflow. So orchestration has always been viable, but left as an exercise to the user. However, competing OSS projects provide popular workflow functionality, suggesting there is demand for an off-the-shelf solution.
17+
18+
Other dotnet messaging platforms erroneously conflate the Saga and Mediator patterns. A Saga is a long-running transaction that spans multiple services. A Mediator is an orchestrator that manages a workflow that involves multiple objects. One aspect of those implementations is typically the ability to store workflow state.
19+
20+
There is a pattern catalogue associated with workflows. [Workflow Patterns](http://www.workflowpatterns.com/patterns/control/index.php) describes both basic and advanced patterns for workflows. We intend to use these patters as guidance for our offering, over traditional .NET workflow offerings in competing products such as Mass Transit and NServicBus, which have tended to be ersatz in design.
21+
22+
A particular reference for the requirements for this work is [AWS step functions](https://states-language.net/spec.html). AWS Step functions provide a state machine that mediates calls to AWS Lambda functions. When thinking about Brighter's `IHandleRequests` it is attractive to compare them to Lambda functions in the Step functions model :
23+
24+
1. The AWS Step funcions state machine does not hold the business logic, that is located in the functions called; the Step function handles calling the Lambda functions and state transitions (as well as error paths)
25+
2. We want to use the Mediator to orchestrate both internal bus and external bus hosted workflows. Step functions provide a useful model of requirements for the latter.
26+
27+
This approach is intended to enable flexible, event-driven workflows that can handle various business processes and requirements, including asynchronous event handling and conditional branching.
28+
29+
Our experience has been that many teams adopt Step Functions to gain access to it as a workflow engine. But this forces them into Lambda Pinball architectures. We believe that Brighter could offer a compelling alternative.
30+
31+
## Decision
32+
33+
We will add a `Mediator` class to Brighter that will:
34+
35+
1. Manages and tracks a WorkflowState object representing the current step in the workflow.
36+
2. Support multiple steps: sequence, choice, parallel, wait.
37+
3. Supports multiple tasks, mapped to typical ws-messaging patterns including:
38+
• FireAndForget: Dispatches a `Command` and immediately advances to the next state.
39+
• RequestReaction: Dispatches a `Command` and waits for an event response before advancing.
40+
• RobustRequestReaction: Reaction event can kick off an error flow.
41+
4. Uses a CommandProcessor for routing commands and events to appropriate handlers.
42+
5. Work is handled within Brighter handlers. They use glue code to call back to the workflow where necessary
43+
6. Can be passed events, and uses the correlation IDs to match events to specific workflow instances and advance the workflow accordingly.
44+
45+
The Specification Pattern in a Choice step will allow flexible conditional logic by combining specifications with And and Or conditions, enabling complex branching decisions within the workflow.
46+
47+
We assume that the initial V10 of Brighter will contain a minimum viable product version of the `Mediator`. Additional functionality, workflows, etc. will be a feature of later releases. Broady our goal within V10 would be to ensure that from [Workflow Patterns](http://www.workflowpatterns.com/patterns/control/index.php) we can deliver the Basic Control Flow patterns. A stretch goal would be to offer some Iteration and Cnacellation patterns.
48+
49+
## Consequences
50+
51+
Positive Consequences
52+
53+
1. Simplicity: Providing orchestration for a workflow, which is easier to understand
54+
2. Modularity: It is possible to extend the `Mediator' relativey easy by adding new process states.
55+
56+
Negative Consequences
57+
58+
1. Increased Brighter scope: Previously we had assumed that developers would use an off-the-shelf workflow solution like [Stateless](https://github.com/nblumhardt/stateless) or [Workflow Core]. The decision to provide our own workflow, to orchestrate via CommandProcessor means that we increase our scope to include the complexity of workflow management.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# 22. Add the Specification Pattern
2+
3+
Date: 2024-11-09
4+
5+
## Status
6+
7+
Proposed
8+
9+
## Context
10+
11+
The Specification Pattern is a software design pattern that is used to define business rules that can be combined to create complex rules. It is used to encapsulate business rules that can be used to determine if an object meets a certain criteria. The pattern was described by Eric Evans and Martin Fowler in [this article](https://martinfowler.com/apsupp/spec.pdf).
12+
13+
Brighter needs the addition of the specification pattern, for two reasons:
14+
15+
1. For use with its Mediator. The Mediator allows Brighter to execute a workflow that has a branching condition. The Specification Pattern can be used to define the branching conditions. See [ADR-0022](0022-use-the-mediator-pattern.md).
16+
2. For use when implementing the [Agreement Dispatcher](https://martinfowler.com/eaaDev/AgreementDispatcher.html) pattern from Martin Fowler. The Agreement Dispatcher pattern is used to dispatch a message to a handler based on a set of criteria. The Specification Pattern can be used to define the criteria.
17+
18+
## Decision
19+
Add the Specification Pattern to Brighter. We could have taken a dependency on an off-the-shelf implementation. Many of the Brighter team worked at Huddle Engineering, and worked on [this](https://github.com/HuddleEng/Specification) implementation of the Specification Pattern. However, this forces Brighter to take a dependency on another project, and we would like to keep Brighter as self-contained as possible. So, whilst we may be inspired by Huddle's implementation, we will write our own.
20+
21+
In this version, we don't need some of the complexity of Huddle's usage of the Visitor pattern, as we only need to control branching. In addition, Huddle's version was written before the wide usage of lambda expressions via delegates in C#, so we can simplify the implementation.
22+
23+
## Consequences
24+
25+
Brighter will provide an implementation of the Specification pattern.
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# ADR: Implementing Parallel Split Step for Concurrent Workflow Execution
2+
3+
## Status
4+
5+
Proposed
6+
7+
## Context
8+
9+
Our workflow currently supports sequential steps executed in a single thread of control. Each step in the workflow proceeds one after another, and the Mediator has been designed with this single-threaded assumption.
10+
11+
To support more advanced control flow, we want to introduce a Parallel Split Step based on the Workflow Patterns Basic Control Flow Patterns. The Parallel Split Step is defined as “the divergence of a branch into two or more parallel branches, each of which execute concurrently.” This will enable the workflow to branch into parallel paths, executing multiple threads of control simultaneously. Each branch will operate independently of the others, continuing the workflow until either completion or a synchronization step (such as a Simple Merge) later in the process.
12+
13+
We would expect a some point to implement the Simple Merge step to allow parallel branches to converge back into a single thread of control. However, this ADR will focus on the Parallel Split Step implementation, with the understanding that future steps will be added to support synchronization.
14+
15+
### Key Requirements
16+
1. Parallel Execution:
17+
* Parallel Split Step must initiate two or more parallel branches within the workflow.
18+
* Each branch should proceed as a separate thread of control, executing steps independently.
19+
2. Concurrency Handling in the Mediator:
20+
* The Mediator needs to manage multiple threads of execution rather than assuming a single-threaded flow.
21+
* It must be able to initiate and track multiple branches for each Parallel Split Step within the workflow.
22+
3. State Persistence for Parallel Branches:
23+
* Workflow state management and persistence will need to be adapted to track the branches of the flow.
24+
* In the case of a crash, each branch should be able to resume from its last saved state.
25+
4. Integration with Future Synchronization Steps:
26+
* The Parallel Split Step should integrate seamlessly with a future Simple Merge step, which will allow parallel branches to converge back into a single thread.
27+
28+
## Decision
29+
1. Parallel Split Step Implementation:
30+
* Introduce a new class, ParallelSplitStep<TData>, derived from Step<TData>.
31+
* Ths class will define multiple branches by specifying two or more independent workflow sequences to be executed in parallel.
32+
2. Producer and Consumer Model for Parallel Execution
33+
* The Mediator will now consist of two classes: a producer (Scheduler) and a consumer (Runner).
34+
* Scheduling a workflow via the Scheduler causes it to send a job to a shared channel or blocking collection.
35+
* The Runner class will act as a consumer, reading workflow jobs from the channel and executing them.
36+
* The Runner is single-threaded, and runs a message pump to process jobs sequentially.
37+
* The job queue is bounded to prevent excessive memory usage and ensure fair scheduling.
38+
* The user can configure the job scheduler for backpressure (producer stalls) or load shedding (dropping jobs).
39+
* The user configures the number of Runners; we don't just pull them from the thread pool. This allows users to control how many threads are used to process jobs. For example, a user could configure a single Runner for a single-threaded workflow, or multiple Runners for parallel execution.
40+
3. In the In-Memory version the job channels will be implemented using a BlockingCollection<T> with a bounded capacity.
41+
* We won't separately store workflow data in a database; the job channel is the storage for work to be done, or in flight
42+
* When we branch, we schedule onto the same channel; this means a Runner has a dependency on the Mediator
43+
4. For resilience, we will need to use a persistent queue for the workflow channels.
44+
* We assume that workflow will become unlocked when their owning Runner crashes, allowing another runner to pick them up
45+
* We will use one derived from a database, not a message queue.
46+
* This will be covered in a later ADR, and likely create some changes
47+
48+
## Consequences
49+
50+
### Positive Consequences
51+
* Concurrency and Flexibility: The addition of Parallel Split allows workflows to handle concurrent tasks and enables more complex control flows.
52+
* Scalability: Running parallel branches improves throughput, as tasks that are independent of each other can execute simultaneously.
53+
* Adaptability for Future Steps: Implementing parallel branching prepares the workflow for synchronization steps (e.g., Simple Merge), allowing flexible convergence of parallel tasks.
54+
* Resilience:
55+
56+
### Negative Consequences
57+
* Increased Complexity in State Management: Tracking multiple branches requires more complex state management to ensure each branch persists and resumes accurately.
58+
* Concurrency Overhead in the Mediator: Managing multiple threads of control adds overhead. We now have both a Runner and a Scheduler.
59+
60+
### Use of Middleware or Db for The Job Channel
61+
* We could use a middleware library to manage the job channel. Brighter itself manages a queue or stream of work with a single-threaded pump
62+
* This would mean the scheduler uses the commandprocessor to deposit a job on a queue, and the runner would be our existing message pump, which would pass to a job handler that executed the workflow.
63+
* The alternative here is to use the database as the job channel. This would mean that the scheduler would write a job to the database, and the runner would read from the database.
64+
* For now, we defer this decision to a later ADR. First we want to understand the whole scope of the work, through an in-memory implementation, then we will determine what an out-of-process implementation would look like.
65+
66+
### Merge of parallel branches
67+
* Future ADR for implementing Simple Merge Step for synchronization of parallel branches.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# 25. Use Reactive Progamming For Mediator
2+
3+
Date: 2025-01-13
4+
5+
## Status
6+
7+
Accepted
8+
9+
## Context
10+
11+
We have scenarios in any workflow where we need to split and then later merge. Our decision to handle the split in
12+
[0024](./0024-add-parallel-split-to-mediator.md) led us on the path to seperating a scheduler and a runner - a
13+
classic producer and consumer pattern. We can use `Channels` (or a `BlockingCollection`) in dotnet to support the
14+
implementation of an internal producer-consumer (as opposed to one using messaging.
15+
16+
Our approach to resolve split was simply to have one channel for the workflow to be scheduled on, so that we could
17+
schedule the splits back to the channel. We don't have a solution for merging those splits.
18+
19+
We also have an approach to waiting for an external event, that we halt the flow, save it's state, and then reschedule
20+
once we are notified of the event we are waiting for. This works well for a single event, but external. It works
21+
less well for multiple events, or internal events, that go best over a channel.
22+
23+
## Decision
24+
25+
We will move to a Flow Based Programming approach to implementing the work. Each `Step<>` in the workflow will
26+
derive from a new type `Component`.
27+
28+
As a FBP component it has an `In` port, an instance of `IAmAJobChannel`. When a component is activated it runs a
29+
message pump to read work from the `In` port, until the port is marked as completed. Once there is no more work, the
30+
`Component` deactivates. A component should save state before it deactivates, to indicate that it was completed.
31+
32+
An `Out` port is actually a call to the next component. Putting work on the `Out`port activates the next component
33+
and puts work on its `In` port.
34+
35+
```
36+
--> [In][Component][Out] -->
37+
```
38+
39+
On a split, there is an array of `Out` ports to write to, instead of a single port. Generically then we require an
40+
overload of any Out method call on the base 'Component' that takes an array of `IAmAJobChannel`
41+
42+
```
43+
--> [In][Component][Out...] -->
44+
```
45+
46+
On a merge that is an array of 'In' ports to write to, instead of a single port. We may force you to wait for
47+
everything to arrive before continuing, or allow you to proceed as soon as you arrive in the joined flow.
48+
49+
We may choose to use the FBP brackets approach to any merge. The upstream sends an 'opening bracket' to 'In'
50+
indicating a sequence follows. The 'bracket' indicates whether we are 'WaitAll' or 'WaitAny' and the channels to
51+
listen on. The downstream component then listens to those channels, until they complete, and obeys the
52+
'WaitAll' or 'WaitAny' as appropriate.
53+
54+
For configuration of a downstream a component needs an `Opt` channel which can take generic configuration information
55+
(most likely the payload here is a `Configuration` class with an `object` payload).
56+
57+
## Consequences
58+
59+
FBP is stongly aligned with workflows, so adopting concepts from FBP gives us a strong programming model to work with.
60+
FBP has already solved many of the problems around running workflows, so it gives us a strong plan to work with.
61+
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Environment file for user defined variables in docker-compose.yml
2+
3+
# 1. CONFIG_PATH: Path to Config.json file
4+
# Ex: CONFIG_PATH="C:\\Config\\Config.json"
5+
CONFIG_PATH="./Config.json"
6+
7+
# 2. ACCEPT_EULA: Pass 'Y' to accept license terms for Azure SQL Edge and Azure Service Bus emulator.
8+
# Service Bus emulator EULA : https://github.com/Azure/azure-service-bus-emulator-installer/blob/main/EMULATOR_EULA.txt
9+
# SQL Edge EULA : https://go.microsoft.com/fwlink/?linkid=2139274
10+
ACCEPT_EULA="Y"
11+
12+
# 3. MSSQL_SA_PASSWORD to be filled by user as per policy : https://learn.microsoft.com/en-us/sql/relational-databases/security/strong-passwords?view=sql-server-linux-ver16
13+
SQL_PASSWORD: "Password1!"

0 commit comments

Comments
 (0)