Skip to content

Commit 831b78a

Browse files
committed
Test coverage applied
1 parent c08635c commit 831b78a

19 files changed

Lines changed: 969 additions & 234 deletions

File tree

src/KeeperData.Core/Exceptions/NonRetryableException.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ namespace KeeperData.Core.Exceptions;
22

33
public class NonRetryableException : Exception
44
{
5-
public NonRetryableException(string message, Exception inner) : base(message, inner)
6-
{
7-
}
85
public NonRetryableException(string message) : base(message)
96
{
107
}

src/KeeperData.Core/Exceptions/RetryableException.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ namespace KeeperData.Core.Exceptions;
22

33
public class RetryableException : Exception
44
{
5-
public RetryableException(string message, Exception inner) : base(message, inner)
6-
{
7-
}
85
public RetryableException(string message) : base(message)
96
{
107
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using Amazon.SQS.Model;
2+
using KeeperData.Core.Messaging.Contracts;
3+
4+
namespace KeeperData.Core.Messaging.Extensions;
5+
6+
public static class MessageAttributes
7+
{
8+
private const string MESSAGE_SUFFIX = "Message";
9+
10+
public static T? GetMessageAttributeValue<T>(this Message message, string key)
11+
{
12+
if (message.MessageAttributes is null || !message.MessageAttributes.TryGetValue(key, out var attribute))
13+
return default;
14+
15+
return ParseAttributeValue<T>(attribute.DataType, attribute.StringValue);
16+
}
17+
18+
public static T? GetMessageAttributeValue<T>(this SnsEnvelope envelope, string key)
19+
{
20+
if (envelope.MessageAttributes is null || !envelope.MessageAttributes.TryGetValue(key, out var attribute))
21+
return default;
22+
23+
return ParseAttributeValue<T>(attribute.Type, attribute.Value);
24+
}
25+
26+
private static T? ParseAttributeValue<T>(string type, string? raw)
27+
{
28+
if (raw is null) return default;
29+
30+
if (typeof(T) == typeof(string) && type == "String")
31+
return (T)(object)raw;
32+
33+
if (typeof(T) == typeof(int) && type == "Number" && int.TryParse(raw, out var intVal))
34+
return (T)(object)intVal;
35+
36+
if (typeof(T) == typeof(double) && type == "Number" && double.TryParse(raw, out var doubleVal))
37+
return (T)(object)doubleVal;
38+
39+
return default;
40+
}
41+
42+
public static string ReplaceSuffix(this string? messageName)
43+
{
44+
return messageName?.EndsWith(MESSAGE_SUFFIX) == true
45+
? messageName[..^MESSAGE_SUFFIX.Length]
46+
: messageName ?? string.Empty;
47+
}
48+
}

src/KeeperData.Core/Messaging/Extensions/MessageExtensions.cs

Lines changed: 0 additions & 46 deletions
This file was deleted.

src/KeeperData.Core/Messaging/Extensions/SqsMessageUnwrapper.cs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,52 @@ public static UnwrappedMessage Unwrap(this Message message, IMessageSerializer<S
1010
{
1111
ArgumentNullException.ThrowIfNull(message);
1212

13+
var envelope = TryDeserializeEnvelope(messageSerializer, message);
14+
15+
return envelope?.Type == "Notification"
16+
? UnwrapFromEnvelope(envelope)
17+
: UnwrapFromRawMessage(message);
18+
}
19+
20+
private static SnsEnvelope? TryDeserializeEnvelope(IMessageSerializer<SnsEnvelope> serializer, Message message)
21+
{
1322
try
1423
{
15-
var envelope = messageSerializer.Deserialize(message);
16-
if (envelope?.Type == "Notification")
17-
{
18-
return new UnwrappedMessage
19-
{
20-
MessageId = envelope.MessageId,
21-
CorrelationId = envelope.GetMessageAttributeValue<string>("CorrelationId") ?? string.Empty,
22-
Subject = (envelope.GetMessageAttributeValue<string>("Subject") ?? "Default").ReplaceSuffix(),
23-
Payload = envelope.Message,
24-
Attributes = envelope.MessageAttributes?.ToDictionary(
25-
kvp => kvp.Key,
26-
kvp => kvp.Value?.Value ?? string.Empty)
27-
};
28-
}
24+
return serializer.Deserialize(message);
2925
}
3026
catch
3127
{
28+
return null;
3229
}
30+
}
31+
32+
private static UnwrappedMessage UnwrapFromEnvelope(SnsEnvelope envelope)
33+
{
34+
var subject = envelope.GetMessageAttributeValue<string>("Subject") ?? "Default";
35+
var correlationId = envelope.GetMessageAttributeValue<string>("CorrelationId") ?? string.Empty;
36+
37+
return new UnwrappedMessage
38+
{
39+
MessageId = envelope.MessageId,
40+
CorrelationId = correlationId,
41+
Subject = subject.ReplaceSuffix(),
42+
Payload = envelope.Message,
43+
Attributes = envelope.MessageAttributes?.ToDictionary(
44+
kvp => kvp.Key,
45+
kvp => kvp.Value?.Value ?? string.Empty)
46+
};
47+
}
48+
49+
private static UnwrappedMessage UnwrapFromRawMessage(Message message)
50+
{
51+
var subject = message.GetMessageAttributeValue<string>("Subject") ?? "Default";
52+
var correlationId = message.GetMessageAttributeValue<string>("CorrelationId") ?? string.Empty;
3353

3454
return new UnwrappedMessage
3555
{
3656
MessageId = message.MessageId,
37-
CorrelationId = message.GetMessageAttributeValue<string>("CorrelationId") ?? string.Empty,
38-
Subject = (message.GetMessageAttributeValue<string>("Subject") ?? "Default").ReplaceSuffix(),
57+
CorrelationId = correlationId,
58+
Subject = subject.ReplaceSuffix(),
3959
Payload = message.Body ?? string.Empty,
4060
Attributes = message.MessageAttributes?.ToDictionary(
4161
kvp => kvp.Key,

src/KeeperData.Core/Messaging/MessageHandlers/MessageHandlerInfo.cs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,15 @@ namespace KeeperData.Core.Messaging.MessageHandlers;
22

33
public class MessageHandlerInfo
44
{
5-
public bool IsDynamic { get; }
65
public Type HandlerType { get; }
76

8-
private MessageHandlerInfo(bool isDynamic, Type handlerType)
7+
private MessageHandlerInfo(Type handlerType)
98
{
10-
IsDynamic = isDynamic;
119
HandlerType = handlerType;
1210
}
1311

14-
public static MessageHandlerInfo Dynamic(Type handlerType)
15-
{
16-
return new MessageHandlerInfo(true, handlerType);
17-
}
18-
1912
public static MessageHandlerInfo Typed(Type handlerType)
2013
{
21-
return new MessageHandlerInfo(false, handlerType);
14+
return new MessageHandlerInfo(handlerType);
2215
}
2316
}

src/KeeperData.Infrastructure/Messaging/Configuration/QueueConsumerOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ public record QueueConsumerOptions
55
public required string QueueUrl { get; init; }
66
public int MaxNumberOfMessages { get; init; }
77
public int WaitTimeSeconds { get; init; }
8-
public bool Disabled { get; init; } = false;
8+
public bool Disabled { get; set; } = false;
99
}

src/KeeperData.Infrastructure/Messaging/Consumers/QueuePoller.cs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class QueuePoller(IServiceScopeFactory scopeFactory,
3131
private IQueuePollerObserver<MessageType>? _observer;
3232

3333
private Task? _pollingTask;
34-
private CancellationTokenSource? _cts;
34+
private CancellationTokenSource _cts = new();
3535

3636
private const string MESSAGE_SUFFIX = "Message";
3737

@@ -42,7 +42,7 @@ public Task StartAsync(CancellationToken cancellationToken)
4242
using var scope = _scopeFactory.CreateScope();
4343
_observer = scope.ServiceProvider.GetService<IQueuePollerObserver<MessageType>>();
4444

45-
if (_queueConsumerOptions?.Disabled == true)
45+
if (_queueConsumerOptions.Disabled == true)
4646
{
4747
_logger.LogInformation("Queue {queueUrl} disabled in config", _queueConsumerOptions.QueueUrl);
4848

@@ -60,11 +60,18 @@ public async Task StopAsync(CancellationToken cancellationToken)
6060
{
6161
_logger.LogInformation("QueuePoller stop requested.");
6262

63-
_cts?.Cancel();
63+
_cts.Cancel();
6464

6565
if (_pollingTask is { IsCompletedSuccessfully: false })
6666
{
67-
await _pollingTask;
67+
try
68+
{
69+
await _pollingTask;
70+
}
71+
catch (TaskCanceledException)
72+
{
73+
// Expected during cancellation
74+
}
6875
}
6976
}
7077

@@ -107,17 +114,16 @@ private async Task PollMessagesAsync(CancellationToken cancellationToken)
107114
MessageAttributeNames = ["All"]
108115
}, cancellationToken);
109116

110-
if (response?.Messages.Count == 0) continue;
117+
var messages = response?.Messages;
118+
119+
if (messages == null || messages.Count == 0) continue;
111120

112121
_logger.LogTrace("Completed receive for queue: {queueUrl}, Number of messages: {count}",
113-
_queueConsumerOptions.QueueUrl, response?.Messages.Count);
122+
_queueConsumerOptions.QueueUrl, messages.Count);
114123

115-
if (response?.Messages?.Count > 0)
124+
foreach (var message in messages)
116125
{
117-
foreach (var message in response.Messages)
118-
{
119-
await HandleMessageAsync(message, _queueConsumerOptions.QueueUrl, cancellationToken);
120-
}
126+
await HandleMessageAsync(message, _queueConsumerOptions.QueueUrl, cancellationToken);
121127
}
122128
}
123129
catch (OperationCanceledException)
@@ -142,7 +148,6 @@ private async Task HandleMessageAsync(Message message, string queueUrl, Cancella
142148
var unwrappedMessage = message.Unwrap(_messageSerializer);
143149

144150
var handlerTypes = _messageHandlerManager.GetHandlersForMessage(unwrappedMessage.Subject);
145-
146151
foreach (var handlerInfo in handlerTypes)
147152
{
148153
var messageType = _messageHandlerManager.GetMessageTypeByName($"{unwrappedMessage.Subject}{MESSAGE_SUFFIX}");

src/KeeperData.Infrastructure/Messaging/MessageHandlers/InMemoryMessageHandlerManager.cs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public void AddReceiver<T, TH>()
2121
{
2222
var messageType = GetMessageTypeKey<T>();
2323

24-
DoAddReceiver(typeof(TH), messageType, isDynamic: false);
24+
DoAddReceiver(typeof(TH), messageType);
2525

2626
if (!_messageTypes.Contains(typeof(T)))
2727
{
@@ -46,15 +46,15 @@ public bool HasHandlerForMessage<T>() where T : MessageType
4646
return HasHandlerForMessage(key);
4747
}
4848

49-
public IEnumerable<MessageHandlerInfo> GetHandlersForMessage(string messageType) => _handlers[messageType];
49+
public IEnumerable<MessageHandlerInfo> GetHandlersForMessage(string messageType) => _handlers[messageType.ReplaceSuffix()];
5050

5151
public IEnumerable<MessageHandlerInfo> GetHandlersForMessage<T>() where T : MessageType
5252
{
5353
var key = GetMessageTypeKey<T>();
5454
return GetHandlersForMessage(key);
5555
}
5656

57-
private void DoAddReceiver(Type handlerType, string messageType, bool isDynamic)
57+
private void DoAddReceiver(Type handlerType, string messageType)
5858
{
5959
if (!HasHandlerForMessage(messageType))
6060
{
@@ -67,13 +67,6 @@ private void DoAddReceiver(Type handlerType, string messageType, bool isDynamic)
6767
$"Handler Type {handlerType.Name} already registered for '{messageType}'", nameof(handlerType));
6868
}
6969

70-
if (isDynamic)
71-
{
72-
_handlers[messageType].Add(MessageHandlerInfo.Dynamic(handlerType));
73-
}
74-
else
75-
{
76-
_handlers[messageType].Add(MessageHandlerInfo.Typed(handlerType));
77-
}
70+
_handlers[messageType].Add(MessageHandlerInfo.Typed(handlerType));
7871
}
7972
}

0 commit comments

Comments
 (0)