Skip to content

Commit cf8901c

Browse files
committed
Completed messaging refactoring. Test coverage to be completed.
2 parents 80afc37 + 2e3d042 commit cf8901c

55 files changed

Lines changed: 1349 additions & 503 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

compose/start-localstack.sh

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,58 @@ queue_url=$(awslocal sqs create-queue \
1919

2020
echo "SQS Queue created: $queue_url"
2121

22+
# Get the SQS Queue ARN
23+
queue_arn=$(awslocal sqs get-queue-attributes \
24+
--queue-url "$queue_url" \
25+
--attribute-name QueueArn \
26+
--output text \
27+
--query 'Attributes.QueueArn')
28+
29+
echo "SQS Queue ARN: $queue_arn"
30+
31+
# Create SNS Topics
32+
topic_arn=$(awslocal sns create-topic \
33+
--name ls-keeper-data-bridge-events \
34+
--endpoint-url=http://localhost:4566 \
35+
--output text \
36+
--query 'TopicArn')
37+
38+
echo "SNS Topic created: $topic_arn"
39+
40+
# Construct the policy JSON inline with escaped quotes
41+
policy_json=$(cat <<EOF
42+
{
43+
"Version": "2012-10-17",
44+
"Statement": [
45+
{
46+
"Effect": "Allow",
47+
"Principal": "*",
48+
"Action": "sqs:SendMessage",
49+
"Resource": "$queue_arn",
50+
"Condition": {
51+
"ArnEquals": {
52+
"aws:SourceArn": "$topic_arn"
53+
}
54+
}
55+
}
56+
]
57+
}
58+
EOF
59+
)
60+
61+
# Set SQS policy
62+
awslocal sqs set-queue-attributes \
63+
--queue-url "$queue_url" \
64+
--attributes "{\"Policy\": \"$(
65+
echo "$policy_json" | jq -c
66+
)\"}"
67+
68+
# Subscribe the Queue to the Topic
69+
awslocal sns subscribe \
70+
--topic-arn "$topic_arn" \
71+
--protocol sqs \
72+
--notification-endpoint "$queue_arn"
73+
74+
echo "SNS Topic subscription complete"
75+
2276
echo "Bootstrapping Complete"

docker-compose.override.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ services:
66
- ASPNETCORE_ENVIRONMENT=Development
77
- ASPNETCORE_HTTP_PORTS=8080
88
- AWS__ServiceURL=http://localstack-emulator:4566
9+
#- AWS__ServiceURL=http://sqs.eu-west-2.localhost.localstack.cloud:4566/
910
- Mongo__DatabaseUri=mongodb://mongodb:27017
11+
- QueueConsumerOptions__IntakeEventQueueOptions__QueueUrl=http://sqs.eu-west-2.127.0.0.1:4566/000000000000/ls_keeper_data_intake_queue
1012
ports:
1113
- "8080"
1214
volumes:

src/KeeperData.Api/appsettings.IntegrationTest.json

Lines changed: 0 additions & 30 deletions
This file was deleted.

src/KeeperData.Api/appsettings.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
},
1111
"QueueConsumerOptions": {
1212
"IntakeEventQueueOptions": {
13-
"QueueUrl": "http://sqs.eu-west-2.127.0.0.1:4566/000000000000/ls_keeper_data_intake_queue",
13+
"QueueUrl": "Set in cdp-app-config (or for local development using docker-compose.override.yml)",
1414
"MaxNumberOfMessages": 10,
15-
"WaitTimeSeconds": 1,
15+
"WaitTimeSeconds": 5,
1616
"Disabled": false
1717
}
1818
},
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using KeeperData.Core.Messaging.Contracts;
2+
using KeeperData.Core.Messaging.Contracts.V1;
3+
using KeeperData.Core.Messaging.MessageHandlers;
4+
using KeeperData.Core.Messaging.Serializers;
5+
6+
namespace KeeperData.Application.MessageHandlers;
7+
8+
public class PlaceholderMessageHandler(IUnwrappedMessageSerializer<PlaceholderMessage> serializer) : IMessageHandler<PlaceholderMessage>
9+
{
10+
private readonly IUnwrappedMessageSerializer<PlaceholderMessage> _serializer = serializer;
11+
12+
public async Task<MessageType> Handle(UnwrappedMessage message, CancellationToken cancellationToken = default)
13+
{
14+
ArgumentNullException.ThrowIfNull(message, nameof(message));
15+
16+
var messagePayload = _serializer.Deserialize(message);
17+
18+
// Do something with the messagePayload
19+
20+
return await Task.FromResult(messagePayload!);
21+
}
22+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
namespace KeeperData.Core.Exceptions;
2+
3+
public class NonRetryableException : Exception
4+
{
5+
public NonRetryableException(string message, Exception inner) : base(message, inner)
6+
{
7+
}
8+
public NonRetryableException(string message) : base(message)
9+
{
10+
}
11+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
namespace KeeperData.Core.Exceptions;
2+
3+
public class RetryableException : Exception
4+
{
5+
public RetryableException(string message, Exception inner) : base(message, inner)
6+
{
7+
}
8+
public RetryableException(string message) : base(message)
9+
{
10+
}
11+
}

src/KeeperData.Core/KeeperData.Core.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10+
<PackageReference Include="AWSSDK.Core" Version="4.0.0.26" />
11+
<PackageReference Include="AWSSDK.SQS" Version="4.0.0.15" />
12+
<PackageReference Include="AWSSDK.SecurityToken" Version="4.0.2.1" />
1013
<PackageReference Include="MongoDB.Driver" Version="2.28.0" />
1114
</ItemGroup>
1215

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace KeeperData.Core.Messaging.Consumers;
2+
3+
public interface IQueuePoller
4+
{
5+
Task StartAsync(CancellationToken token);
6+
Task StopAsync(CancellationToken token);
7+
}

src/KeeperData.Core/Messaging/Contracts/IntakeEventModel.cs

Lines changed: 0 additions & 7 deletions
This file was deleted.

0 commit comments

Comments
 (0)