11using System ;
2+ using System . Linq ;
23using System . Threading . Tasks ;
34using Microsoft . Extensions . Logging ;
45using Moq ;
56using NUnit . Framework ;
67using ViennaNET . Diagnostic ;
7- using ViennaNET . Messaging . Configuration ;
88using ViennaNET . Messaging . Context ;
99using ViennaNET . Messaging . Messages ;
1010using ViennaNET . Messaging . Processing . Impl . Subscribe ;
11+ using ViennaNET . Messaging . Tests . Unit . DSL ;
1112
1213namespace ViennaNET . Messaging . Tests . Unit . Processing . Impl . Subscribe
1314{
@@ -19,7 +20,7 @@ public class QueueSubscribedReactorBaseTests
1920 public void StartProcessing_NoErrorsOnConnection_ReturnsTrue ( )
2021 {
2122 // arrange
22- var reactor = GetReactor ( null , out var fakeAdapter , out var fakeHealthService ) ;
23+ var reactor = GetReactor ( ) ;
2324
2425 // act
2526 var result = reactor . StartProcessing ( ) ;
@@ -32,9 +33,10 @@ public void StartProcessing_NoErrorsOnConnection_ReturnsTrue()
3233 public void StartProcessing_AdapterAlreadyConnected_AdapterConnectNotCalled ( )
3334 {
3435 // arrange
35- var reactor = GetReactor ( true , out var fakeAdapter , out var fakeHealthService ) ;
36+ var fakeAdapter = Given . MessageAdapter . MockPlease < IMessageAdapterWithSubscribing > ( ) ;
3637 fakeAdapter . Setup ( x => x . IsConnected )
3738 . Returns ( true ) ;
39+ var reactor = GetReactor ( messageAdapter : fakeAdapter . Object ) ;
3840 // act
3941 var result = reactor . StartProcessing ( ) ;
4042
@@ -47,9 +49,11 @@ public void StartProcessing_AdapterAlreadyConnected_AdapterConnectNotCalled()
4749 public void StartProcessing_AdapterThrowsExceptionOnConnect_FailReturnsFalse ( )
4850 {
4951 // arrange
50- var reactor = GetReactor ( true , out var fakeAdapter , out var fakeHealthService ) ;
52+ var fakeAdapter = Given . MessageAdapter . MockPlease < IMessageAdapterWithSubscribing > ( ) ;
5153 fakeAdapter . Setup ( x => x . Connect ( ) )
5254 . Throws < Exception > ( ) ;
55+ var reactor = GetReactor ( messageAdapter : fakeAdapter . Object ) ;
56+
5357 // act
5458 var result = reactor . StartProcessing ( ) ;
5559
@@ -62,59 +66,107 @@ public void StartProcessing_AdapterThrowsTimeoutExceptionAndDoNotThrowsOnRetry_S
6266 {
6367 // arrange
6468 var isConnectCalled = false ;
65- var reactor = GetReactor ( true , out var fakeAdapter , out var fakeHealthService ) ;
69+ var fakeAdapter = Given . MessageAdapter . MockPlease < IMessageAdapterWithSubscribing > ( ) ;
6670 fakeAdapter . Setup ( x => x . Connect ( ) )
67- . Callback ( ( ) =>
71+ . Callback ( ( ) =>
6872 {
6973 if ( ! isConnectCalled )
7074 {
7175 isConnectCalled = true ;
7276 throw new TimeoutException ( ) ;
7377 }
7478 } ) ;
75-
79+ var reactor = GetReactor ( messageAdapter : fakeAdapter . Object ) ;
80+
7681 // act
7782 var result = reactor . StartProcessing ( ) ;
7883
7984 // assert
8085 Assert . That ( result , Is . True ) ;
8186 }
8287
83- private QueueSubscribedReactorBase GetReactor (
84- bool ? serviceHealthDependent , out Mock < IMessageAdapterWithSubscribing > fakeAdapter ,
85- out Mock < IHealthCheckingService > fakeHealthService )
88+ [ Test ]
89+ public async Task StartProcessing_ConcurrentMessages_ShouldProcessSuccessfully ( )
90+ {
91+ // arrange
92+ var messageAdapter = Given . MessageAdapter . Please < IMessageAdapterWithSubscribing > ( ) ;
93+ var reactor = GetReactor ( messageAdapter : messageAdapter ) ;
94+ reactor . StartProcessing ( ) ;
95+
96+ // act & assert
97+ Task SendMessage ( ) => Task . Run ( ( ) => messageAdapter . Send ( new TextMessage ( ) ) ) ;
98+ var tasks = Enumerable . Range ( 1 , 10 ) . Select ( _ => SendMessage ( ) ) ;
99+ await Task . WhenAll ( tasks ) ;
100+ }
101+
102+ [ Test ]
103+ public void StartProcessing_ClearCallContextFailed_ShouldProcessMessage ( )
104+ {
105+ var messageAdapter = Given . MessageAdapter . Please < IMessageAdapterWithSubscribing > ( ) ;
106+ var messagingCallContextAccessorMock = new Mock < IMessagingCallContextAccessor > ( ) ;
107+ messagingCallContextAccessorMock . Setup ( x => x . CleanContext ( ) ) . Throws < Exception > ( ) ;
108+ var reactor = GetReactor ( messageAdapter : messageAdapter ,
109+ messagingCallContextAccessor : messagingCallContextAccessorMock . Object ) ;
110+
111+ reactor . StartProcessing ( ) ;
112+ messageAdapter . Send ( new TextMessage ( ) ) ;
113+
114+ Assert . That ( reactor . WasProcessed , Is . True ) ;
115+ }
116+
117+ [ Test ]
118+ public void StartProcessing_WhenDiagnosticFailed_ShouldUnsubscribe ( )
86119 {
87- fakeAdapter = new Mock < IMessageAdapterWithSubscribing > ( ) ;
88- fakeAdapter . Setup ( x => x . Configuration )
89- . Returns ( new Mock < QueueConfigurationBase > ( ) . Object ) ;
90- fakeHealthService = new Mock < IHealthCheckingService > ( ) ;
91- var fakeCallContextAccessor = new Mock < IMessagingCallContextAccessor > ( ) ;
92-
93- return new QueueSubscribedReactorWrapper ( fakeAdapter . Object , 100 , serviceHealthDependent , fakeHealthService . Object ,
94- fakeCallContextAccessor . Object ) ;
120+ var messageAdapter = Given . MessageAdapter . MockPlease < IMessageAdapterWithSubscribing > ( ) ;
121+ var healthCheckingServiceMock = new Mock < IHealthCheckingService > ( ) ;
122+ var reactor = GetReactor ( messageAdapter : messageAdapter . Object ,
123+ healthService : healthCheckingServiceMock . Object ) ;
124+
125+ reactor . StartProcessing ( ) ;
126+ healthCheckingServiceMock . Raise ( x => x . DiagnosticFailedEvent += null ) ;
127+
128+ messageAdapter . Verify ( x => x . Unsubscribe ( ) ) ;
95129 }
96130
131+ private static QueueSubscribedReactorWrapper GetReactor (
132+ bool ? serviceHealthDependent = true ,
133+ IMessageAdapterWithSubscribing messageAdapter = null ,
134+ IHealthCheckingService healthService = null ,
135+ IMessagingCallContextAccessor messagingCallContextAccessor = null )
136+ => new QueueSubscribedReactorWrapper ( messageAdapter ?? Given . MessageAdapter . Please < IMessageAdapterWithSubscribing > ( ) ,
137+ 100 ,
138+ serviceHealthDependent ,
139+ healthService ?? Mock . Of < IHealthCheckingService > ( ) ,
140+ messagingCallContextAccessor ?? Mock . Of < IMessagingCallContextAccessor > ( ) ) ;
141+
97142 private class QueueSubscribedReactorWrapper : QueueSubscribedReactorBase
98143 {
99144 public QueueSubscribedReactorWrapper (
100- IMessageAdapterWithSubscribing messageAdapter , int reconnectTimeout , bool ? serviceHealthDependent ,
101- IHealthCheckingService healthCheckingService , IMessagingCallContextAccessor messagingCallContextAccessor ) : base ( messageAdapter ,
102- reconnectTimeout ,
103- serviceHealthDependent ,
104- healthCheckingService ,
105- messagingCallContextAccessor ,
106- Mock . Of < ILogger > ( ) )
145+ IMessageAdapterWithSubscribing messageAdapter ,
146+ int reconnectTimeout ,
147+ bool ? serviceHealthDependent ,
148+ IHealthCheckingService healthCheckingService ,
149+ IMessagingCallContextAccessor messagingCallContextAccessor ) : base ( messageAdapter ,
150+ reconnectTimeout ,
151+ serviceHealthDependent ,
152+ healthCheckingService ,
153+ messagingCallContextAccessor ,
154+ Mock . Of < ILogger > ( ) )
107155 {
108156 }
157+
158+ public bool WasProcessed { get ; private set ; }
109159
110160 protected override bool GetProcessedMessage ( BaseMessage message )
111161 {
112- throw new System . NotImplementedException ( ) ;
162+ WasProcessed = true ;
163+ return true ;
113164 }
114165
115166 protected override Task < bool > GetProcessedMessageAsync ( BaseMessage message )
116167 {
117- throw new System . NotImplementedException ( ) ;
168+ WasProcessed = true ;
169+ return Task . FromResult ( true ) ;
118170 }
119171 }
120172 }
0 commit comments