Skip to content
This repository was archived by the owner on Aug 18, 2021. It is now read-only.

Commit 228d2e5

Browse files
committed
feat: added simple test for ServiceBus
This is just a draft for testing the ServiceBus implementation.
1 parent f4899e4 commit 228d2e5

File tree

4 files changed

+248
-17
lines changed

4 files changed

+248
-17
lines changed

src/Liquid.Activation/Worker/LightWorker.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public abstract class LightWorker : LightBackgroundTask, ILightWorker
2525
protected readonly static Dictionary<MethodInfo, QueueAttribute> _queues = new Dictionary<MethodInfo, QueueAttribute>();
2626
protected readonly static Dictionary<MethodInfo, TopicAttribute> _topics = new Dictionary<MethodInfo, TopicAttribute>();
2727
private readonly List<string> _inputValidationErrors = new List<string>();
28-
protected ILightTelemetry Telemetry { get; } = Workbench.Instance.Telemetry != null ? (ILightTelemetry)Workbench.Instance.Telemetry.CloneService() : null;
28+
protected ILightTelemetry Telemetry => Workbench.Instance.Telemetry;
2929
protected ILightCache Cache => Workbench.Instance.Cache;
3030
//Instance of CriticHandler to inject on the others classes
3131
private readonly CriticHandler _criticHandler = new CriticHandler();

src/Liquid.OnAzure/Liquid.OnAzure.csproj

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
<DebugType>full</DebugType>
1616
<DebugSymbols>true</DebugSymbols>
1717
</PropertyGroup>
18+
<ItemGroup>
19+
<AdditionalFiles Include="..\..\stylecop.json" Link="stylecop.json" />
20+
</ItemGroup>
1821
<ItemGroup>
1922
<Compile Remove="Telemetry\AppInsightsTelemetryMiddleware.cs" />
2023
</ItemGroup>
@@ -29,6 +32,10 @@
2932
<PackageReference Include="Microsoft.CodeAnalysis.FxCopAnalyzers" Version="2.9.3" />
3033
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.2.0" />
3134
<PackageReference Include="Microsoft.Extensions.Caching.Redis" Version="2.2.0" />
35+
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.113">
36+
<PrivateAssets>all</PrivateAssets>
37+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
38+
</PackageReference>
3239
<PackageReference Include="WindowsAzure.Storage" Version="9.3.3" />
3340
</ItemGroup>
3441
<ItemGroup>

src/Liquid.OnAzure/MessageBuses/ServiceBus.cs

Lines changed: 167 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,125 @@
1-
using Liquid.Base.Interfaces;
1+
// Copyright (c) Avanade Inc. All rights reserved.
2+
// Licensed under the MIT License. See LICENSE in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Reflection;
7+
using System.Threading.Tasks;
8+
using Liquid.Activation;
29
using Liquid.Domain;
310
using Liquid.Domain.Base;
4-
using Liquid.Activation;
511
using Liquid.Runtime.Configuration.Base;
612
using Liquid.Runtime.Telemetry;
713
using Microsoft.Azure.ServiceBus;
8-
using System;
9-
using System.Collections.Generic;
10-
using System.Reflection;
11-
using System.Text;
12-
using System.Threading.Tasks;
1314

1415
namespace Liquid.OnAzure
1516
{
1617
/// <summary>
17-
/// Implementation of the communication component between queues and topics of the Azure, this class is specific to azure
18+
/// Defines an object capable of creating instances of <see cref="IQueueClient"/>.
19+
/// </summary>
20+
public interface IQueueClientFactory
21+
{
22+
/// <summary>
23+
/// Creates a new instance of <see cref="IQueueClient"/>.
24+
/// </summary>
25+
/// <param name="connectionString">The connection string for the client.</param>
26+
/// <param name="queueName">The name of the queue.</param>
27+
/// <param name="receiveMode">The receive mode that the client will connect to the queue.</param>
28+
/// <returns>A new instance of <see cref="IQueueClient"/>.</returns>
29+
IQueueClient CreateClient(string connectionString, string queueName, ReceiveMode receiveMode);
30+
}
31+
32+
/// <summary>
33+
/// Defines an object capable of creating instances of <see cref="ISubscriptionClient"/>.
34+
/// </summary>
35+
public interface ISubscriptionClientFactory
36+
{
37+
/// <summary>
38+
/// Creates a new instance of <see cref="ISubscriptionClient"/>.
39+
/// </summary>
40+
/// <param name="connectionString">The connection string for the client.</param>
41+
/// <param name="topicName">The name of the topic to connect to.</param>
42+
/// <param name="subscriptionName">Identifies the subscription to this topic.</param>
43+
/// <param name="receiveMode">The receive mode that the client will connect to the queue.</param>
44+
/// <returns>A new instance of <see cref="ISubscriptionClient"/>.</returns>
45+
ISubscriptionClient CreateClient(string connectionString, string topicName, string subscriptionName, ReceiveMode receiveMode);
46+
}
47+
48+
/// <summary>
49+
/// Configuration source for <see cref="ServiceBus"/>.
50+
/// </summary>
51+
// TODO: should remove this class once we move to .NET configuration system
52+
public interface IServiceBusConfigurationProvider
53+
{
54+
/// <summary>
55+
/// Gets the configuration for a <see cref="ServiceBus"/>.
56+
/// </summary>
57+
/// <returns>
58+
/// The current configuration for a service bus.
59+
/// </returns>
60+
ServiceBusConfiguration GetConfiguration();
61+
62+
/// <summary>
63+
/// Gets the configuration for a <see cref="ServiceBus"/>.
64+
/// </summary>
65+
/// <param name="connectionName">
66+
/// Identifies which connection should be retrieved from the file.
67+
/// </param>
68+
/// <returns>
69+
/// The current configuration for a service bus.
70+
/// </returns>
71+
ServiceBusConfiguration GetConfiguration(string connectionName);
72+
}
73+
74+
/// <summary>
75+
/// Implementation of the communication component between queues and topics of the Azure, this class is specific to azure.
1876
/// </summary>
1977
public class ServiceBus : LightWorker, IWorkbenchService
2078
{
79+
/// <summary>
80+
/// Factory used to create a <see cref="IQueueClient"/>.
81+
/// </summary>
82+
private readonly IQueueClientFactory _queueClientFactory = new DefaultQueueClientFactory();
83+
84+
/// <summary>
85+
/// Factory used to create a <see cref="ISubscriptionClient"/>.
86+
/// </summary>
87+
private readonly ISubscriptionClientFactory _subscriptionClientFactory = new DefaultSubscriptionClientFactory();
88+
89+
/// <summary>
90+
/// Service that retrives a <see cref="ServiceBusConfiguration"/>.
91+
/// </summary>
92+
private readonly IServiceBusConfigurationProvider _configurationProvider = new DefaultServiceBusConfigurationProvider();
93+
94+
/// <summary>
95+
/// Initializes a new instance of the <see cref="ServiceBus"/> class.
96+
/// </summary>
97+
public ServiceBus()
98+
{
99+
}
100+
101+
/// <summary>
102+
/// Initializes a new instance of the <see cref="ServiceBus"/> class.
103+
/// </summary>
104+
/// <param name="queueClientFactory">
105+
/// Dependency. Used to obtain new instances of a <see cref="IQueueClient"/>.
106+
/// </param>
107+
/// <param name="subscriptionClientFactory">
108+
/// Dependency. Used to obtain new instances of a <see cref="ISubscriptionClient"/>.
109+
/// </param>
110+
/// <param name="configurationProvider">
111+
/// Dependency. Used to retrieve a configuration for this class.
112+
/// </param>
113+
public ServiceBus(
114+
IQueueClientFactory queueClientFactory,
115+
ISubscriptionClientFactory subscriptionClientFactory,
116+
IServiceBusConfigurationProvider configurationProvider)
117+
{
118+
_queueClientFactory = queueClientFactory ?? throw new ArgumentNullException(nameof(queueClientFactory));
119+
_subscriptionClientFactory = subscriptionClientFactory ?? throw new ArgumentNullException(nameof(subscriptionClientFactory));
120+
_configurationProvider = configurationProvider ?? throw new ArgumentNullException(nameof(configurationProvider));
121+
}
122+
21123
/// <summary>
22124
/// Implementation of the start process queue and process topic. It must be called parent before start processes.
23125
/// </summary>
@@ -36,16 +138,17 @@ public override void Initialize()
36138
/// <returns>StringConnection of the ServiceBus</returns>
37139
private string GetConnection<T>(KeyValuePair<MethodInfo, T> item)
38140
{
39-
MethodInfo method = item.Key;
40-
string connectionKey = GetKeyConnection(method);
41-
ServiceBusConfiguration config = null;
141+
var method = item.Key;
142+
var connectionKey = GetKeyConnection(method);
143+
144+
ServiceBusConfiguration config;
42145
if (string.IsNullOrEmpty(connectionKey)) // Load specific settings if provided
43146
{
44-
config = LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}");
147+
config = _configurationProvider.GetConfiguration();//LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}");
45148
}
46149
else
47150
{
48-
config = LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}_{connectionKey}");
151+
config = _configurationProvider.GetConfiguration(connectionKey);//LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}_{connectionKey}");
49152
}
50153

51154
return config.ConnectionString;
@@ -80,7 +183,7 @@ public void ProcessQueue()
80183
int takeQuantity = queue.Value.TakeQuantity;
81184

82185
//Register Trace on the telemetry
83-
QueueClient queueReceiver = new QueueClient(GetConnection(queue), queueName, receiveMode);
186+
var queueReceiver = _queueClientFactory.CreateClient(GetConnection(queue), queueName, receiveMode);
84187

85188
//Register the method to process receive message
86189
//The RegisterMessageHandler is validate for all register exist on the queue, without need loop for items
@@ -114,7 +217,7 @@ await queueReceiver.DeadLetterAsync(message.SystemProperties.LockToken,
114217
{
115218
Exception moreInfo = new Exception($"Error setting up queue consumption from service bus. See inner exception for details. Message={exception.Message}", exception);
116219
//Use the class instead of interface because tracking exceptions directly is not supposed to be done outside AMAW (i.e. by the business code)
117-
((LightTelemetry)Workbench.Instance.Telemetry).TrackException(moreInfo);
220+
(Workbench.Instance.Telemetry as LightTelemetry)?.TrackException(moreInfo);
118221
}
119222
}
120223

@@ -132,14 +235,18 @@ private void ProcessSubscription()
132235
string topicName = topic.Value.TopicName;
133236
string subscriptName = topic.Value.Subscription;
134237
ReceiveMode receiveMode = ReceiveMode.PeekLock;
238+
135239
if (topic.Value.DeleteAfterRead)
136240
{
137241
receiveMode = ReceiveMode.ReceiveAndDelete;
138242
}
243+
139244
int takeQuantity = topic.Value.TakeQuantity;
140245

141246
//Register Trace on the telemetry
142-
SubscriptionClient subscriptionClient = new SubscriptionClient(GetConnection(topic), topicName, subscriptName, receiveMode, null);
247+
var subscriptionClient = _subscriptionClientFactory.CreateClient(
248+
GetConnection(topic), topicName, subscriptName, receiveMode
249+
);
143250

144251
//Register the method to process receive message
145252
//The RegisterMessageHandler is validate for all register exist on the queue, without need loop for items
@@ -208,5 +315,49 @@ protected override Task ProcessAsync()
208315
{
209316
throw new NotImplementedException();
210317
}
318+
319+
/// <summary>
320+
/// Default implementation for <see cref="IQueueClientFactory"/>,
321+
/// creates instances of <see cref="IQueueClient"/>.
322+
/// </summary>
323+
private class DefaultQueueClientFactory : IQueueClientFactory
324+
{
325+
/// <inheritdoc/>
326+
public IQueueClient CreateClient(string connectionString, string queueName, ReceiveMode receiveMode)
327+
{
328+
return new QueueClient(connectionString, queueName, receiveMode);
329+
}
330+
}
331+
332+
/// <summary>
333+
/// Default implementation for <see cref="ISubscriptionClientFactory"/>,
334+
/// creates instances of <see cref="SubscriptionClient"/>.
335+
/// </summary>
336+
private class DefaultSubscriptionClientFactory : ISubscriptionClientFactory
337+
{
338+
/// <inheritdoc/>
339+
public ISubscriptionClient CreateClient(string connectionString, string topicName, string subscriptionName, ReceiveMode mode)
340+
{
341+
return new SubscriptionClient(connectionString, topicName, subscriptionName, mode, null);
342+
}
343+
}
344+
345+
/// <summary>
346+
/// Retrieves configuration using <see cref="LightConfigurator"/>.
347+
/// </summary>
348+
private class DefaultServiceBusConfigurationProvider : IServiceBusConfigurationProvider
349+
{
350+
/// <inheritdoc/>
351+
public ServiceBusConfiguration GetConfiguration()
352+
{
353+
return LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}");
354+
}
355+
356+
/// <inheritdoc/>
357+
public ServiceBusConfiguration GetConfiguration(string connectionKey)
358+
{
359+
return LightConfigurator.Config<ServiceBusConfiguration>($"{nameof(ServiceBus)}_{connectionKey}");
360+
}
361+
}
211362
}
212363
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright (c) Avanade Inc. All rights reserved.
2+
// Licensed under the MIT License. See LICENSE in the project root for license information.
3+
4+
using System;
5+
using System.Diagnostics.CodeAnalysis;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Liquid.Activation;
9+
using Liquid.Interfaces;
10+
using Liquid.Tests;
11+
using Microsoft.Azure.ServiceBus;
12+
using NSubstitute;
13+
using Xunit;
14+
15+
namespace Liquid.OnAzure.Tests
16+
{
17+
public class ServiceBusTests
18+
{
19+
public ServiceBusTests()
20+
{
21+
Workbench.Instance.Reset();
22+
Workbench.Instance.AddToCache(WorkbenchServiceType.Telemetry, Substitute.For<ILightTelemetry>());
23+
}
24+
25+
[Theory, AutoSubstituteData]
26+
public void InitializeWhenExistsLightWorkerForQueueThenListenerIsCreated(ServiceBusConfiguration configuration)
27+
{
28+
// ARRANGE
29+
var queueClientFactory = Substitute.For<IQueueClientFactory>();
30+
var queueClient = Substitute.For<IQueueClient>();
31+
32+
queueClientFactory.CreateClient(default, default, default).ReturnsForAnyArgs(queueClient);
33+
34+
var subscriptionClientFactory = Substitute.For<ISubscriptionClientFactory>();
35+
var subscriptionClient = Substitute.For<ISubscriptionClient>();
36+
37+
subscriptionClientFactory.CreateClient(default, default, default, default).ReturnsForAnyArgs(subscriptionClient);
38+
39+
var configurationProvider = Substitute.For<IServiceBusConfigurationProvider>();
40+
41+
configurationProvider.GetConfiguration(Arg.Any<string>()).Returns(configuration);
42+
43+
var sut = new ServiceBus(queueClientFactory, subscriptionClientFactory, configurationProvider);
44+
45+
// ACT
46+
sut.Initialize();
47+
48+
// ASSERT
49+
queueClient
50+
.Received(1)
51+
.RegisterMessageHandler(Arg.Any<Func<Message, CancellationToken, Task>>(), Arg.Any<MessageHandlerOptions>());
52+
}
53+
54+
[MessageBus("Test")]
55+
[SuppressMessage(
56+
"Design",
57+
"CA1034:Nested types should not be visible",
58+
Justification = "Used by tests and must be public")]
59+
public class MyWorker : LightWorker
60+
{
61+
[Queue("QueueName")]
62+
public static void MyQueueMethod(string message)
63+
{
64+
if (message is null)
65+
{
66+
throw new ArgumentNullException(nameof(message));
67+
}
68+
69+
Console.WriteLine(message.ToUpperInvariant() + "asd");
70+
}
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)