-
Notifications
You must be signed in to change notification settings - Fork 7
Message correlation and casuation
Message-based domain logic debug and profiling is quite tricky, because it is hard to build execution chain based only on code, especially when many pub-sub mechanics are involved.
To provide more control over domain logic it is a common pattern to add data to messages to know it parent and processing flow id. Each message involved in domain logic is enchanced with metadata containing correlation and casuation id.
Casuasion id is id of message, with processing created current message. It is a id of "parent", linking set of messages together like linked list structure.
Correlation id is id of logical process message is beloging to. Correlation id is created somewere marking start of the flow, and is assigned to flow start message. All messages created while flow is processed will share same correlation id.
GridDomain enrich all messages involved in domain logic with metadata, containing correlation and casuation id. It includes messages between aggregates, sagas and custim handlers.
Correlatin Id is determined with all new commands except created in Sagas \ Future events, and allows to track execution flow from command start to all events persisted, even after many sagas work.
In addition message metadata contains processing history for debuggin purposes.
To add metadata to message, use corresponding method in IPublisher: public interface IPublisher{ void Publish(object message, IMessageMetadata metadata) }
Messages inside GridDomain are epassed in envelop IMessageMetadataEnvelop, and automaticaly unwraps when nececcary. Usially it is message waiter after command execution.
Correlation id allows to identify domain events produced by particular command. When we executing a command and want to wait for some produced messages, we can use correlation id to be sure events were created by our initial command. This possibility allows to avoid additional filters when waiting for command events.
New fluent API for command execution uses correlation id properties, and we can write a code like
var result = await GridNode.PrepareCommand(cmd)
.Expect<SampleEvent>()
.Execute();
and will be sure we will receive SampleEvent only from our command without additional filters.
New interface for command waiting was introduced, it is simpler then existing ICommandWaiterFactory and guarantee to clients that correlation id mathcing will be performed by default:
Same factory as old one, IMessageWaiterFactory:
public interface ICommandWaiterFactory
{
ICommandWaiter PrepareCommand<T>(T cmd, IMessageMetadata metadata=null) where T:ICommand;
}
public interface ICommandWaiter
{
ICommandExpectBuilder Expect<TMsg>(Predicate<TMsg> filter = null);
ICommandExpectBuilder Expect(Type type, Func<object, bool> filter = null);
}
expect builder for command execution is like regular IExpectBuilder, but allows command execution without calling method Create:
public interface ICommandExpectBuilder
{
Task<IWaitResults> Execute(TimeSpan? timeout = null, bool failOnAnyFault = true);
ICommandExpectBuilder And<TMsg>(Predicate<TMsg> filter = null);
ICommandExpectBuilder Or<TMsg>(Predicate<TMsg> filter = null);
}
All xpect builders taking care of wrapping, subscribing and unwrapping metadata envelop from messages to allow user to use bare domai evet. ICommandExpectBuilder implementations can provide results and for bare message and for enveloped. Message waiters provide transient metadata envelop unwrapping for backward compatability, e.g. IMessageWaiter.Expect will listen for message T and IMessagaMetadataEnvelop, and we can ask for results for T and for its metadata with MessageWithMetadata extensions method
var res = await GridNode.PrepareCommand(_command, _commandMetadata)
.Expect<JobFailed>()
.And<IFault<RaiseScheduledDomainEventCommand>>()
.Execute(TimeSpan.FromSeconds(30));
_schedulingCommandFault = res.MessageIFault<RaiseScheduledDomainEventCommand>>>();
_jobFailedEnvelop = res.MessageWithMetadata<JobFailed>();