11using Azure . Messaging . ServiceBus ;
22using servicebus_cli . Services ;
3- using Spectre . Console ;
43
54namespace servicebus_cli . Subjects . Deadletter . Actions ;
65
@@ -44,15 +43,14 @@ public async Task Resend(List<string> args)
4443
4544 _consoleService . WriteMarkup ( $ "[grey]Selected fully qualified namespace: { fullyQualifiedNamespace } [/]") ;
4645
47- var queues = await AnsiConsole . Status ( )
48- . StartAsync ( $ "Fetching queues on { fullyQualifiedNamespace } ...", async ctx =>
49- {
50- ctx . Spinner ( Spinner . Known . Dots ) ;
51- ctx . SpinnerStyle ( Style . Parse ( "yellow" ) ) ;
52-
53- return await _serviceBusService . GetInformationAboutAllQueues ( fullyQualifiedNamespace ) . ConfigureAwait ( false ) ;
54- } ) ;
46+ var getQueuesWorkload = async ( ) =>
47+ {
48+ return await _serviceBusService . GetInformationAboutAllQueues ( fullyQualifiedNamespace ) . ConfigureAwait ( false ) ;
49+ } ;
5550
51+ var queues = await _consoleService . ProcessWorkloadWithSpinner (
52+ $ "Fetching queues on { fullyQualifiedNamespace } ...",
53+ getQueuesWorkload ) ;
5654
5755 var selectedQueue = await _consoleService . PromptSelection (
5856 "Select a [green]queue[/]:" ,
@@ -82,36 +80,31 @@ public async Task Resend(List<string> args)
8280 }
8381
8482 var queue = await _serviceBusService . ConnectToQueue ( fullyQualifiedNamespace , entityPath ) ;
85- var resentDlCount = 0 ;
86- IReadOnlyList < ServiceBusReceivedMessage > messages ;
8783
88- //TODO: Try to use ProcessWorkloadWithStatusUpdates here
89- await AnsiConsole . Status ( ) . StartAsync ( $ "Resending messages... 0 / { deadletterCount } ", async ctx =>
84+ var resendMessagesWorkload = async ( ) =>
9085 {
91- do
86+ var messages = await queue . DeadletterReceiver . ReceiveMessagesAsync ( 1000 , TimeSpan . FromSeconds ( 30 ) ) ;
87+ var tasks = new List < Task > ( ) ;
88+
89+ foreach ( var message in messages )
9290 {
93- messages = await queue . DeadletterReceiver . ReceiveMessagesAsync ( 1000 , TimeSpan . FromSeconds ( 30 ) ) ;
94- var tasks = new List < Task > ( ) ;
91+ var sendMessage = new ServiceBusMessage ( message ) ;
9592
96- foreach ( var message in messages )
97- {
98- var sendMessage = new ServiceBusMessage ( message ) ;
93+ if ( queue . QueueProperties . RequiresSession ) //Only set session id if the queue supports sessions
94+ sendMessage . SessionId = message . SessionId ;
9995
100- if ( queue . QueueProperties . RequiresSession ) //Only set session id if the queue supports sessions
101- sendMessage . SessionId = message . SessionId ;
96+ tasks . Add ( queue . Sender . SendMessageAsync ( sendMessage ) ) ;
97+ }
98+ await Task . WhenAll ( tasks ) ;
99+ return messages ;
100+ } ;
102101
103- tasks . Add ( queue . Sender . SendMessageAsync ( sendMessage ) ) ;
104- }
105- await Task . WhenAll ( tasks ) ;
106- resentDlCount += messages . Count ;
107- ctx . Status ( $ "Resending messages... { resentDlCount } / { deadletterCount } ") ;
108- } while ( messages . Count > 0 && resentDlCount < deadletterCount ) ;
109- } ) ;
110-
111- if ( resentDlCount > deadletterCount )
112- _consoleService . WriteWarning ( $ "The count of resent messages ({ resentDlCount } ) was greater than the initial deadletter count ({ deadletterCount } ). This may happen due to deadletters being re-sent and ending up on the deadletter queue again before the resend job was able to finish. It is an indicator that there are bad messages on your deadletter queue that should be handled and/or removed instead of resent.") ;
113- else
114- _consoleService . WriteSuccess ( $ "Resent { resentDlCount } messages from deadletter queue { entityPath } ") ;
102+ await _consoleService . ProcessWorkloadWithStatusUpdates < ServiceBusReceivedMessage , IReadOnlyList < ServiceBusReceivedMessage > > (
103+ "Resending" ,
104+ "Resent" ,
105+ "The count of resent messages was greater than the initial deadletter count. This may happen due to deadletters being re-sent and ending up on the deadletter queue again before the resend job was able to finish. It is an indicator that there are bad messages on your deadletter queue that should be handled and/or removed instead of resent. " ,
106+ deadletterCount . Value ,
107+ resendMessagesWorkload ) ;
115108 }
116109
117110 public async Task Purge ( List < string > args )
@@ -142,12 +135,12 @@ public async Task Purge(List<string> args)
142135
143136 _consoleService . WriteMarkup ( $ "[grey]Selected fully qualified namespace: { fullyQualifiedNamespace } [/]") ;
144137
145- var asyncWorkload = async ( ) =>
138+ var purgeMessagesWorkload = async ( ) =>
146139 {
147140 return await _serviceBusService . GetInformationAboutAllQueues ( fullyQualifiedNamespace ) . ConfigureAwait ( false ) ;
148141 } ;
149142
150- var queues = await _consoleService . ProcessWorkloadWithSpinner ( $ "Fetching queues on { fullyQualifiedNamespace } ...", asyncWorkload ) ;
143+ var queues = await _consoleService . ProcessWorkloadWithSpinner ( $ "Fetching queues on { fullyQualifiedNamespace } ...", purgeMessagesWorkload ) ;
151144
152145 var selectedQueue = await _consoleService . PromptSelection (
153146 "Select a [green]queue[/]:" ,
@@ -178,7 +171,7 @@ public async Task Purge(List<string> args)
178171
179172 var queue = await _serviceBusService . ConnectToQueue ( fullyQualifiedNamespace , entityPath ) ;
180173
181- var asyncWorkload2 = async ( ) =>
174+ var deleteMessagesWorkload = async ( ) =>
182175 {
183176 return await queue . DeadletterReceiver . ReceiveMessagesAsync ( 1000 , TimeSpan . FromSeconds ( 30 ) ) ; //Simply receiving messages deletes them as well
184177 } ;
@@ -188,6 +181,6 @@ await _consoleService.ProcessWorkloadWithStatusUpdates<ServiceBusReceivedMessage
188181 "Deleted" ,
189182 "This is usually a sign that there are new deadletter messages arriving while purging. It might be good idea to investigate why this is happening." ,
190183 deadletterCountTotal . Value ,
191- asyncWorkload2 ) ;
184+ deleteMessagesWorkload ) ;
192185 }
193186}
0 commit comments