Skip to content

Commit 34c8486

Browse files
CDMS-852: checks for compressed encoded queue messages and uncompresses and decodes before deserializing (#136)
1 parent 55898fa commit 34c8486

File tree

9 files changed

+222
-35
lines changed

9 files changed

+222
-35
lines changed

BtmsGateway.Test/Consumers/ConsumerMediatorTests.cs

Lines changed: 71 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System.IO.Compression;
2+
using System.Text;
13
using System.Text.Json;
24
using BtmsGateway.Consumers;
35
using BtmsGateway.Domain;
@@ -35,16 +37,14 @@ public async Task WhenCustomsDeclaration_ShouldPassThroughToClearanceDecisionCon
3537
Context = context,
3638
};
3739

38-
var message = JsonSerializer.Deserialize<JsonElement>(
39-
JsonSerializer.Serialize(
40-
new ResourceEvent<CustomsDeclaration>
41-
{
42-
ResourceId = "mrn",
43-
ResourceType = ResourceEventResourceTypes.CustomsDeclaration,
44-
Operation = ResourceEventOperations.Created,
45-
SubResourceType = ResourceEventSubResourceTypes.ClearanceDecision,
46-
}
47-
)
40+
var message = JsonSerializer.Serialize(
41+
new ResourceEvent<CustomsDeclaration>
42+
{
43+
ResourceId = "mrn",
44+
ResourceType = ResourceEventResourceTypes.CustomsDeclaration,
45+
Operation = ResourceEventOperations.Created,
46+
SubResourceType = ResourceEventSubResourceTypes.ClearanceDecision,
47+
}
4848
);
4949

5050
var act = async () => await subject.OnHandle(message, CancellationToken.None);
@@ -71,16 +71,14 @@ public async Task WhenProcessingError_ShouldPassThroughToProcessingErrorConsumer
7171
Context = context,
7272
};
7373

74-
var message = JsonSerializer.Deserialize<JsonElement>(
75-
JsonSerializer.Serialize(
76-
new ResourceEvent<ProcessingErrorResource>
77-
{
78-
ResourceId = "mrn",
79-
ResourceType = ResourceEventResourceTypes.ProcessingError,
80-
Operation = ResourceEventOperations.Created,
81-
Resource = new ProcessingErrorResource { ProcessingErrors = [new ProcessingError()] },
82-
}
83-
)
74+
var message = JsonSerializer.Serialize(
75+
new ResourceEvent<ProcessingErrorResource>
76+
{
77+
ResourceId = "mrn",
78+
ResourceType = ResourceEventResourceTypes.ProcessingError,
79+
Operation = ResourceEventOperations.Created,
80+
Resource = new ProcessingErrorResource { ProcessingErrors = [new ProcessingError()] },
81+
}
8482
);
8583

8684
var act = async () => await subject.OnHandle(message, CancellationToken.None);
@@ -107,19 +105,63 @@ public async Task WhenUnsupportedResourceType_ShouldNotThrow()
107105
Context = context,
108106
};
109107

110-
var message = JsonSerializer.Deserialize<JsonElement>(
111-
JsonSerializer.Serialize(
112-
new ResourceEvent<CustomsDeclaration>
113-
{
114-
ResourceId = "mrn",
115-
ResourceType = ResourceEventResourceTypes.ImportPreNotification,
116-
Operation = ResourceEventOperations.Created,
117-
}
118-
)
108+
var message = JsonSerializer.Serialize(
109+
new ResourceEvent<CustomsDeclaration>
110+
{
111+
ResourceId = "mrn",
112+
ResourceType = ResourceEventResourceTypes.ImportPreNotification,
113+
Operation = ResourceEventOperations.Created,
114+
}
119115
);
120116

121117
var act = async () => await subject.OnHandle(message, CancellationToken.None);
122118

123119
await act.Should().NotThrowAsync<ClearanceDecisionProcessingException>();
124120
}
121+
122+
[Fact]
123+
public async Task WhenCompressedMessage_ShouldPassThroughToConsumer()
124+
{
125+
var context = Substitute.For<IConsumerContext>();
126+
context.Headers.Returns(
127+
new Dictionary<string, object>
128+
{
129+
{ MessageBusHeaders.ResourceType, ResourceEventResourceTypes.CustomsDeclaration },
130+
{ MessageBusHeaders.ContentEncoding, "gzip, base64" },
131+
}
132+
);
133+
var subject = new ConsumerMediator(
134+
Substitute.For<IDecisionSender>(),
135+
Substitute.For<IErrorNotificationSender>(),
136+
Substitute.For<ILoggerFactory>()
137+
)
138+
{
139+
Context = context,
140+
};
141+
142+
var message = JsonSerializer.Serialize(
143+
new ResourceEvent<CustomsDeclaration>
144+
{
145+
ResourceId = "mrn",
146+
ResourceType = ResourceEventResourceTypes.CustomsDeclaration,
147+
Operation = ResourceEventOperations.Created,
148+
SubResourceType = ResourceEventSubResourceTypes.ClearanceDecision,
149+
}
150+
);
151+
152+
var act = async () => await subject.OnHandle(CompressMessage(message), CancellationToken.None);
153+
154+
await act.Should().ThrowAsync<ClearanceDecisionProcessingException>();
155+
}
156+
157+
private static string CompressMessage(string message)
158+
{
159+
var buffer = Encoding.UTF8.GetBytes(message);
160+
var memoryStream = new MemoryStream();
161+
using var gzipStream = new GZipStream(memoryStream, CompressionLevel.Optimal);
162+
gzipStream.Write(buffer, 0, buffer.Length);
163+
gzipStream.Flush();
164+
165+
return Convert.ToBase64String(memoryStream.ToArray());
166+
}
125167
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System.IO.Compression;
2+
using System.Text;
3+
using BtmsGateway.Utils;
4+
using FluentAssertions;
5+
6+
namespace BtmsGateway.Test.Utils;
7+
8+
public class MessageDeserializerTests
9+
{
10+
private const string Content = """{"hello": "there"}""";
11+
12+
[Fact]
13+
public void Deserialize_WhenGivenACompressedMessage_ThenShouldReturnDecompressedMessage()
14+
{
15+
var result = MessageDeserializer.Deserialize<object>(CompressMessage(Content), "gzip, base64")!;
16+
17+
result.ToString().Should().Be(Content);
18+
}
19+
20+
[Fact]
21+
public void Deserialize_WhenGivenAnUncompressedMessage_ThenShouldReturnUncompressedMessage()
22+
{
23+
var result = MessageDeserializer.Deserialize<object>(Content, null)!;
24+
25+
result.ToString().Should().Be(Content);
26+
}
27+
28+
[Fact]
29+
public void Deserialize_WhenGivenInvalidContentType_ShouldThrowException()
30+
{
31+
Assert.Throws<NotImplementedException>(() => MessageDeserializer.Deserialize<object>(Content, "brotli"));
32+
}
33+
34+
private static string CompressMessage(string message)
35+
{
36+
var buffer = Encoding.UTF8.GetBytes(message);
37+
var memoryStream = new MemoryStream();
38+
using var gzipStream = new GZipStream(memoryStream, CompressionLevel.Optimal);
39+
gzipStream.Write(buffer, 0, buffer.Length);
40+
gzipStream.Flush();
41+
42+
return Convert.ToBase64String(memoryStream.ToArray());
43+
}
44+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using BtmsGateway.Utils;
2+
using FluentAssertions;
3+
4+
namespace BtmsGateway.Test.Utils;
5+
6+
public class ToStringSerializerTests
7+
{
8+
private readonly ToStringSerializer _toStringSerializer = new();
9+
10+
[Fact]
11+
public void Deserialize_String_Returns_String()
12+
{
13+
_toStringSerializer.Deserialize(null!, "sosig").Should().Be("sosig");
14+
}
15+
16+
[Fact]
17+
public void Deserialize_Byte_Returns_String()
18+
{
19+
_toStringSerializer.Deserialize(null!, "sosig"u8.ToArray()).Should().Be("sosig");
20+
}
21+
}

BtmsGateway/Consumers/ConsumerMediator.cs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
using BtmsGateway.Domain;
33
using BtmsGateway.Extensions;
44
using BtmsGateway.Services.Routing;
5+
using BtmsGateway.Utils;
6+
using BtmsGateway.Utils.Logging;
57
using Defra.TradeImportsDataApi.Domain.CustomsDeclaration;
68
using Defra.TradeImportsDataApi.Domain.Events;
79
using SlimMessageBus;
@@ -12,14 +14,16 @@ public class ConsumerMediator(
1214
IDecisionSender decisionSender,
1315
IErrorNotificationSender errorNotificationSender,
1416
ILoggerFactory loggerFactory
15-
) : IConsumer<JsonElement>, IConsumerWithContext
17+
) : IConsumer<string>, IConsumerWithContext
1618
{
1719
private readonly ILogger<ConsumerMediator> _logger = loggerFactory.CreateLogger<ConsumerMediator>();
1820

1921
public IConsumerContext Context { get; set; } = null!;
2022

21-
public Task OnHandle(JsonElement message, CancellationToken cancellationToken)
23+
public Task OnHandle(string received, CancellationToken cancellationToken)
2224
{
25+
var message = MessageDeserializer.Deserialize<JsonElement>(received, Context.Headers.GetContentEncoding());
26+
2327
var resourceType = Context.GetResourceType();
2428

2529
return resourceType switch

BtmsGateway/Extensions/AmazonConsumerExtensions.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
using System.Text.Json;
21
using BtmsGateway.Config;
32
using BtmsGateway.Consumers;
3+
using BtmsGateway.Utils;
4+
using Microsoft.Extensions.DependencyInjection.Extensions;
45
using SlimMessageBus.Host;
56
using SlimMessageBus.Host.AmazonSQS;
6-
using SlimMessageBus.Host.Serialization.SystemTextJson;
7+
using SlimMessageBus.Host.Serialization;
78

89
namespace BtmsGateway.Extensions;
910

@@ -23,11 +24,15 @@ IConfiguration configuration
2324
cfg.ClientProviderFactory = _ => new CdpCredentialsSqsClientProvider(cfg.SqsClientConfig, configuration);
2425
});
2526

26-
messageBusBuilder.AddJsonSerializer();
27+
messageBusBuilder.RegisterSerializer<ToStringSerializer>(s =>
28+
{
29+
s.TryAddSingleton(_ => new ToStringSerializer());
30+
s.TryAddSingleton<IMessageSerializer<string>>(svp => svp.GetRequiredService<ToStringSerializer>());
31+
});
2732

2833
messageBusBuilder
2934
.AutoStartConsumersEnabled(options.AutoStartConsumers)
30-
.Consume<JsonElement>(x =>
35+
.Consume<string>(x =>
3136
x.WithConsumer<ConsumerMediator>()
3237
.Queue(options.OutboundClearanceDecisionsQueueName)
3338
.Instances(options.ConsumersPerHost)

BtmsGateway/Extensions/ConsumerContextExtensions.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public static class MessageBusHeaders
1111
public const string SubResourceType = nameof(SubResourceType);
1212
public const string SqsBusMessage = "Sqs_Message";
1313
public const string ResourceId = nameof(ResourceId);
14+
public const string ContentEncoding = "Content-Encoding";
1415
}
1516

1617
[ExcludeFromCodeCoverage]

BtmsGateway/Utils/Logging/ReadOnlyDictionaryExtensions.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using BtmsGateway.Extensions;
2+
13
namespace BtmsGateway.Utils.Logging;
24

35
public static class ReadOnlyDictionaryExtensions
@@ -6,4 +8,11 @@ public static class ReadOnlyDictionaryExtensions
68
{
79
return headers.TryGetValue(traceHeader, out var traceId) ? traceId.ToString()?.Replace("-", "") : null;
810
}
11+
12+
public static string? GetContentEncoding(this IReadOnlyDictionary<string, object> headers)
13+
{
14+
return headers.TryGetValue(MessageBusHeaders.ContentEncoding, out var contentEncoding)
15+
? contentEncoding.ToString()
16+
: null;
17+
}
918
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.IO.Compression;
2+
using System.Text;
3+
using System.Text.Json;
4+
5+
namespace BtmsGateway.Utils;
6+
7+
public static class MessageDeserializer
8+
{
9+
public static T? Deserialize<T>(string message, string? contentEncoding)
10+
{
11+
if (contentEncoding != null && contentEncoding != "gzip, base64")
12+
{
13+
throw new NotImplementedException(
14+
"Only 'gzip, base64' content encoding is supported, passed: " + contentEncoding
15+
);
16+
}
17+
18+
if (contentEncoding == null)
19+
return JsonSerializer.Deserialize<T>(message);
20+
21+
var compressedBytes = Convert.FromBase64String(message);
22+
using var compressedStream = new MemoryStream(compressedBytes);
23+
using var gzipStream = new GZipStream(compressedStream, CompressionMode.Decompress);
24+
using var reader = new StreamReader(gzipStream, Encoding.UTF8);
25+
26+
return JsonSerializer.Deserialize<T>(reader.ReadToEnd());
27+
}
28+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
using System.Text;
3+
using SlimMessageBus.Host.Serialization;
4+
5+
namespace BtmsGateway.Utils;
6+
7+
public class ToStringSerializer : IMessageSerializer, IMessageSerializer<string>, IMessageSerializerProvider
8+
{
9+
[ExcludeFromCodeCoverage]
10+
public byte[] Serialize(Type t, object message)
11+
{
12+
throw new NotImplementedException();
13+
}
14+
15+
public object Deserialize(Type t, string payload)
16+
{
17+
return payload;
18+
}
19+
20+
public object Deserialize(Type t, byte[] payload)
21+
{
22+
return Encoding.UTF8.GetString(payload);
23+
}
24+
25+
[ExcludeFromCodeCoverage]
26+
string IMessageSerializer<string>.Serialize(Type t, object message)
27+
{
28+
return message.ToString()!;
29+
}
30+
31+
[ExcludeFromCodeCoverage]
32+
public IMessageSerializer GetSerializer(string path) => this;
33+
}

0 commit comments

Comments
 (0)