55using System . Threading . Tasks ;
66using ActiveMQ . Artemis . Client . Extensions . DependencyInjection ;
77using ActiveMQ . Artemis . Client . TestUtils ;
8+ using NScenario ;
89using Xunit ;
910using Xunit . Abstractions ;
1011
@@ -110,33 +111,69 @@ public async Task Should_create_shared_consumer()
110111 [ Fact ]
111112 public async Task Should_create_shared_durable_consumer ( )
112113 {
114+ var scenario = TestScenarioFactory . Default ( new XUnitOutputAdapter ( _testOutputHelper ) ) ;
115+
113116 var address = Guid . NewGuid ( ) . ToString ( ) ;
114117 var queue = Guid . NewGuid ( ) . ToString ( ) ;
115-
116118 var dictionary = new ConcurrentDictionary < int , bool > ( ) ;
117-
118- await using var testFixture = await TestFixture . CreateAsync ( _testOutputHelper , builder =>
119+
120+ var testFixture1 = await scenario . Step ( "Register two shared durable consumers" , async ( ) =>
119121 {
120- builder . AddSharedDurableConsumer ( address , queue , async ( message , consumer , _ , _ ) =>
122+ return await TestFixture . CreateAsync ( _testOutputHelper , builder =>
121123 {
122- dictionary . TryAdd ( 1 , true ) ;
123- await consumer . AcceptAsync ( message ) ;
124- } ) ;
125- builder . AddSharedDurableConsumer ( address , queue , async ( message , consumer , _ , _ ) =>
126- {
127- dictionary . TryAdd ( 2 , true ) ;
128- await consumer . AcceptAsync ( message ) ;
124+ builder . AddSharedDurableConsumer ( address , queue , ( _ , _ , _ , _ ) =>
125+ {
126+ dictionary . TryAdd ( 1 , true ) ;
127+ return Task . CompletedTask ;
128+ } ) ;
129+ builder . AddSharedDurableConsumer ( address , queue , ( _ , _ , _ , _ ) =>
130+ {
131+ dictionary . TryAdd ( 2 , true ) ;
132+ return Task . CompletedTask ;
133+ } ) ;
129134 } ) ;
130135 } ) ;
131136
132- await using var producer = await testFixture . Connection . CreateProducerAsync ( address , RoutingType . Multicast , testFixture . CancellationToken ) ;
133- await producer . SendAsync ( new Message ( "foo" ) , testFixture . CancellationToken ) ;
134- await producer . SendAsync ( new Message ( "foo" ) , testFixture . CancellationToken ) ;
137+ await scenario . Step ( "Send two messages" , async ( ) =>
138+ {
139+ await using var producer = await testFixture1 . Connection . CreateProducerAsync ( address , RoutingType . Multicast , testFixture1 . CancellationToken ) ;
140+ await producer . SendAsync ( new Message ( "foo" ) , testFixture1 . CancellationToken ) ;
141+ await producer . SendAsync ( new Message ( "foo" ) , testFixture1 . CancellationToken ) ;
142+ } ) ;
135143
136- Assert . Equal ( 2 , await Retry . RetryUntil (
137- ( ) => Task . FromResult ( dictionary . Keys . Count ) ,
138- x => x == 2 ,
139- TimeSpan . FromMilliseconds ( 100 ) ) ) ;
144+ await scenario . Step ( "Verify that messages were distributed among two consumers" , async ( ) =>
145+ {
146+ Assert . Equal ( 2 , await Retry . RetryUntil (
147+ func : ( ) => Task . FromResult ( dictionary . Keys . Count ) ,
148+ until : x => x == 2 ,
149+ timeout : TimeSpan . FromMilliseconds ( 100 ) ) ) ;
150+ } ) ;
151+
152+ await scenario . Step ( "Close initial consumers without acknowledging messages" , async ( ) =>
153+ {
154+ await testFixture1 . DisposeAsync ( ) ;
155+ dictionary . Clear ( ) ;
156+ } ) ;
157+
158+ await using var testFixture2 = await scenario . Step ( "Register a new consumer" , async ( ) =>
159+ {
160+ return await TestFixture . CreateAsync ( _testOutputHelper , builder =>
161+ {
162+ builder . AddSharedDurableConsumer ( address , queue , async ( message , consumer , _ , _ ) =>
163+ {
164+ dictionary . TryAdd ( 1 , true ) ;
165+ await consumer . AcceptAsync ( message ) ;
166+ } ) ;
167+ } ) ;
168+ } ) ;
169+
170+ await scenario . Step ( "Verify that messages were redelivered to the new consumer - queue was durable" , async ( ) =>
171+ {
172+ Assert . Equal ( 1 , await Retry . RetryUntil (
173+ func : ( ) => Task . FromResult ( dictionary . Keys . Count ) ,
174+ until : x => x == 1 ,
175+ timeout : TimeSpan . FromMilliseconds ( 100 ) ) ) ;
176+ } ) ;
140177 }
141178 }
142179}
0 commit comments