Skip to content

Commit 0d13773

Browse files
committed
Created an Action to peel dead letter queues
1 parent a23f372 commit 0d13773

6 files changed

Lines changed: 125 additions & 5 deletions

File tree

servicebus-cli/Services/ConsoleService.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
using servicebus_cli.Subjects.Queue.Actions;
44
using servicebus_cli.Subjects.Settings;
55
using Spectre.Console;
6+
using Spectre.Console.Json;
67

78
namespace servicebus_cli.Services;
89

910
public interface IConsoleService
1011
{
1112
Task<bool> ConfirmWarning(string message);
1213
void WriteMarkup(string markup);
14+
void WriteJson(string json);
1315
void WriteError(string markup);
1416
void WriteWarning(string markup);
1517
void WriteSuccess(string markup);
@@ -38,12 +40,18 @@ public class ConsoleService() : IConsoleService
3840

3941
public void WriteMarkup(string markup) => AnsiConsole.MarkupLine(markup);
4042

43+
public void WriteJson(string json)
44+
{
45+
var beautifulJson = new JsonText(json);
46+
AnsiConsole.Write(beautifulJson);
47+
}
48+
4149
public Task<string> PromptForSubject() => PromptSelection("Subject: ", ["deadletter", "queue", "settings", "help"]);
4250

4351
public Task<string> PromptForAction<ActionType>()
4452
{
4553
if (typeof(ActionType) == typeof(DeadletterActions))
46-
return PromptSelection("Action: ", ["resend", "purge"]);
54+
return PromptSelection("Action: ", ["resend", "purge", "peek"]);
4755

4856
if (typeof(ActionType) == typeof(QueueActions))
4957
return PromptSelection("Action: ", ["list"]);

servicebus-cli/Services/ServiceBusService.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public interface IServiceBusService
1111
Task<List<QueueInformation>> GetInformationAboutAllQueues(string fullyQualifiedNamespace, string filter = "");
1212
Task<long?> GetDeadLetterCount(string fullyQualifiedNamespace, string entityPath);
1313
Task<ServiceBusConnection> ConnectToQueue(string fullyQualifiedNamespace, string entityPath);
14+
Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekDeadLetterMessages(string fullyQualifiedNamespace, string entityPath, int maxMessages = 1000);
1415
}
1516

1617
public class ServiceBusService(IServiceBusRepository _serviceBusRepository) : IServiceBusService
@@ -54,6 +55,24 @@ public async Task<List<QueueInformation>> GetInformationAboutAllQueues(string fu
5455
return properties?.Value?.DeadLetterMessageCount;
5556
}
5657

58+
public async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekDeadLetterMessages(string fullyQualifiedNamespace, string entityPath, int maxMessages = 1000)
59+
{
60+
var serviceBusClient = _serviceBusRepository.GetServiceBusClient(fullyQualifiedNamespace);
61+
var receiverOptions = new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter };
62+
var receiver = serviceBusClient.CreateReceiver(entityPath, receiverOptions);
63+
64+
var allMessages = new List<ServiceBusReceivedMessage>();
65+
while (allMessages.Count < maxMessages)
66+
{
67+
var batch = await receiver.PeekMessagesAsync(maxMessages - allMessages.Count);
68+
if (batch.Count == 0)
69+
break;
70+
allMessages.AddRange(batch);
71+
}
72+
73+
return allMessages;
74+
}
75+
5776
public async Task<ServiceBusConnection> ConnectToQueue(string fullyQualifiedNamespace, string entityPath)
5877
{
5978
// Get info about queue (might be nice to have)

servicebus-cli/Subjects/Deadletter/Actions/DeadletterActions.cs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
using Azure.Messaging.ServiceBus;
22
using servicebus_cli.Services;
3+
using System.Text.Json;
34

45
namespace servicebus_cli.Subjects.Deadletter.Actions;
56

67
public interface IDeadletterActions
78
{
89
Task Resend(List<string> args);
910
Task Purge(List<string> args);
11+
Task Peek(List<string> args);
1012
}
1113

1214
public class DeadletterActions(
@@ -107,6 +109,93 @@ await _consoleService.ProcessWorkloadWithStatusUpdates<ServiceBusReceivedMessage
107109
resendMessagesWorkload);
108110
}
109111

112+
public async Task Peek(List<string> args)
113+
{
114+
var fullyQualifiedNamespace = "";
115+
var entityPath = "";
116+
var settingsFileContent = _fileService.GetConfigFileContent();
117+
var savedNamespaces = _userSettingsService.Deserialize(settingsFileContent);
118+
119+
switch (args.Count)
120+
{
121+
case 2:
122+
fullyQualifiedNamespace = args[0];
123+
entityPath = args[1];
124+
break;
125+
default:
126+
if (!savedNamespaces.FullyQualifiedNamespaces.Any())
127+
{
128+
fullyQualifiedNamespace = await _consoleService.PromptFreeText(
129+
"Enter the [green]fully qualified namespace[/]:");
130+
}
131+
else
132+
{
133+
fullyQualifiedNamespace = await _consoleService.PromptSelection(
134+
"Select a fully qualified namespace:",
135+
savedNamespaces.FullyQualifiedNamespaces);
136+
}
137+
138+
_consoleService.WriteMarkup($"[grey]Selected fully qualified namespace: {fullyQualifiedNamespace}[/]");
139+
140+
var peekQueuesWorkload = async () =>
141+
{
142+
return await _serviceBusService.GetInformationAboutAllQueues(fullyQualifiedNamespace).ConfigureAwait(false);
143+
};
144+
145+
var queues = await _consoleService.ProcessWorkloadWithSpinner(
146+
$"Fetching queues on {fullyQualifiedNamespace}...",
147+
peekQueuesWorkload);
148+
149+
var selectedQueue = await _consoleService.PromptSelection(
150+
"Select a [green]queue[/]:",
151+
queues.Select(q => $"{q.QueueProperties.Name} ([green]{q.QueueRuntimeProperties.ActiveMessageCount}[/], [red]{q.QueueRuntimeProperties.DeadLetterMessageCount}[/], [blue]{q.QueueRuntimeProperties.ScheduledMessageCount}[/])").ToList(),
152+
enableSearch: true);
153+
154+
entityPath = selectedQueue.Split(' ')[0];
155+
156+
_consoleService.WriteMarkup($"[grey]Selected queue: {entityPath}[/]");
157+
158+
break;
159+
}
160+
161+
var peekWorkload = async () =>
162+
{
163+
return await _serviceBusService.PeekDeadLetterMessages(fullyQualifiedNamespace, entityPath).ConfigureAwait(false);
164+
};
165+
166+
var messages = await _consoleService.ProcessWorkloadWithSpinner(
167+
$"Peeking deadletter messages on {entityPath}...",
168+
peekWorkload);
169+
170+
if (messages.Count == 0)
171+
{
172+
_consoleService.WriteError($"No deadletter messages found in queue {entityPath}");
173+
return;
174+
}
175+
176+
var jsonMessages = messages.Select(m => new
177+
{
178+
m.MessageId,
179+
Body = m.Body?.ToString(),
180+
m.Subject,
181+
m.ContentType,
182+
m.CorrelationId,
183+
m.DeadLetterReason,
184+
m.DeadLetterErrorDescription,
185+
m.DeadLetterSource,
186+
m.EnqueuedTime,
187+
m.ExpiresAt,
188+
m.SequenceNumber,
189+
m.DeliveryCount,
190+
ApplicationProperties = m.ApplicationProperties.ToDictionary(kvp => kvp.Key, kvp => kvp.Value)
191+
});
192+
193+
var jsonOptions = new JsonSerializerOptions { WriteIndented = true };
194+
var json = JsonSerializer.Serialize(jsonMessages, jsonOptions);
195+
196+
_consoleService.WriteJson(json);
197+
}
198+
110199
public async Task Purge(List<string> args)
111200
{
112201
var fullyQualifiedNamespace = "";

servicebus-cli/Subjects/Deadletter/Deadletter.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public async Task Run(string[] args)
3232
case "purge":
3333
await _deadletterActions.Purge(args.Skip(1).ToList());
3434
break;
35+
case "peek":
36+
await _deadletterActions.Peek(args.Skip(1).ToList());
37+
break;
3538
default:
3639
_helpSubject.Run();
3740
break;

servicebus-cli/Subjects/Queue/Actions/QueueActions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public async Task List(List<string> args)
4545

4646
_consoleService.WriteMarkup($"[grey]Selected fully qualified namespace: {fullyQualifiedNamespace}[/]");
4747

48-
filter = await _consoleService.PromptFreeText("Enter a [green]filter[/] (optional):");
48+
filter = await _consoleService.PromptFreeText("Enter a [green]filter[/] (optional):", allowEmpty: true);
4949

5050
break;
5151
}

servicebus-cli/servicebus-cli.csproj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010

1111
<ItemGroup>
1212
<PackageReference Include="Azure.Identity" Version="1.12.0" />
13-
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
14-
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
15-
<PackageReference Include="spectre.console" Version="0.50.0" />
13+
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
14+
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
15+
<PackageReference Include="spectre.console" Version="0.54.0" />
16+
<PackageReference Include="Spectre.Console.Json" Version="0.54.0" />
1617
</ItemGroup>
1718

1819
</Project>

0 commit comments

Comments
 (0)