Skip to content

Commit 299b3e2

Browse files
authored
Kros.MassTransit.AzureServiceBus library (#44)
1 parent 114ded6 commit 299b3e2

11 files changed

+729
-8
lines changed

Kros.AspNetCore.sln

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kros.ApplicationInsights.Ex
1919
EndProject
2020
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kros.Swagger.Extensions", "src\Kros.Swagger.Extensions\Kros.Swagger.Extensions.csproj", "{BC8159E1-64B5-404E-917F-F17330AC6624}"
2121
EndProject
22+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kros.MassTransit.AzureServiceBus", "src\Kros.MassTransit.AzureServiceBus\Kros.MassTransit.AzureServiceBus.csproj", "{624CDE2A-410F-4EE3-BB0E-71B2F3D42AF2}"
23+
EndProject
24+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{821CE0B1-32F3-42A6-9169-B7EDAFA86FB2}"
25+
ProjectSection(SolutionItems) = preProject
26+
build-ci.yml = build-ci.yml
27+
build.yml = build.yml
28+
EndProjectSection
29+
EndProject
2230
Global
2331
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2432
Debug|Any CPU = Debug|Any CPU
@@ -49,6 +57,10 @@ Global
4957
{BC8159E1-64B5-404E-917F-F17330AC6624}.Debug|Any CPU.Build.0 = Debug|Any CPU
5058
{BC8159E1-64B5-404E-917F-F17330AC6624}.Release|Any CPU.ActiveCfg = Release|Any CPU
5159
{BC8159E1-64B5-404E-917F-F17330AC6624}.Release|Any CPU.Build.0 = Release|Any CPU
60+
{624CDE2A-410F-4EE3-BB0E-71B2F3D42AF2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
61+
{624CDE2A-410F-4EE3-BB0E-71B2F3D42AF2}.Debug|Any CPU.Build.0 = Debug|Any CPU
62+
{624CDE2A-410F-4EE3-BB0E-71B2F3D42AF2}.Release|Any CPU.ActiveCfg = Release|Any CPU
63+
{624CDE2A-410F-4EE3-BB0E-71B2F3D42AF2}.Release|Any CPU.Build.0 = Release|Any CPU
5264
EndGlobalSection
5365
GlobalSection(SolutionProperties) = preSolution
5466
HideSolutionNode = FALSE
@@ -60,6 +72,7 @@ Global
6072
{808D81B7-477D-4576-83AA-A405C63ABCA5} = {F2FE0B51-B10D-491C-AC69-1C9D08D4514E}
6173
{879989F6-7103-4F06-A865-E862EBC31468} = {BECA5997-5362-4C3D-AEF9-C2C6E40CB616}
6274
{BC8159E1-64B5-404E-917F-F17330AC6624} = {BECA5997-5362-4C3D-AEF9-C2C6E40CB616}
75+
{624CDE2A-410F-4EE3-BB0E-71B2F3D42AF2} = {BECA5997-5362-4C3D-AEF9-C2C6E40CB616}
6376
EndGlobalSection
6477
GlobalSection(ExtensibilityGlobals) = postSolution
6578
SolutionGuid = {BF06D895-E4AC-4374-95F5-89F47567F9F6}

build.yml

+15-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
trigger:
22
tags:
3-
include: [ 'aspnetcore-v*', 'mediatr-v*', 'appinsights-v*', 'swagger-v*' ]
3+
include: [ 'appinsights-v*', 'aspnetcore-v*', 'masstransitaz-v*', 'mediatr-v*', 'swagger-v*' ]
44

55
pool: Default
66

@@ -17,12 +17,14 @@ variables:
1717
- group: Nuget
1818
- name: buildConfiguration
1919
value: 'Release'
20+
- name: project.AppInsights
21+
value: 'Kros.ApplicationInsights.Extensions'
2022
- name: project.AspNetCore
2123
value: 'Kros.AspNetCore'
24+
- name: project.MassTransitAzure
25+
value: 'Kros.MassTransit.AzureServiceBus'
2226
- name: project.MediatR
2327
value: 'Kros.MediatR.Extensions'
24-
- name: project.AppInsights
25-
value: 'Kros.ApplicationInsights.Extensions'
2628
- name: project.Swagger
2729
value: 'Kros.Swagger.Extensions'
2830
- name: 'project.Current'
@@ -31,18 +33,22 @@ variables:
3133
value: 0
3234

3335
steps:
36+
- powershell: echo '##vso[task.setvariable variable=project.Current]$(project.AppInsights)'
37+
displayName: 'Set project: $(project.AppInsights)'
38+
condition: startsWith(variables['Build.SourceBranch'], 'refs/tags/appinsights-v')
39+
3440
- powershell: echo '##vso[task.setvariable variable=project.Current]$(project.AspNetCore)'
3541
displayName: 'Set project: $(project.AspNetCore)'
3642
condition: startsWith(variables['Build.SourceBranch'], 'refs/tags/aspnetcore-v')
3743

44+
- powershell: echo '##vso[task.setvariable variable=project.Current]$(project.MassTransitAzure)'
45+
displayName: 'Set project: $(project.MassTransitAzure)'
46+
condition: startsWith(variables['Build.SourceBranch'], 'refs/tags/masstransitaz-v')
47+
3848
- powershell: echo '##vso[task.setvariable variable=project.Current]$(project.MediatR)'
3949
displayName: 'Set project: $(project.MediatR)'
4050
condition: startsWith(variables['Build.SourceBranch'], 'refs/tags/mediatr-v')
4151

42-
- powershell: echo '##vso[task.setvariable variable=project.Current]$(project.AppInsights)'
43-
displayName: 'Set project: $(project.AppInsights)'
44-
condition: startsWith(variables['Build.SourceBranch'], 'refs/tags/appinsights-v')
45-
4652
- powershell: echo '##vso[task.setvariable variable=project.Current]$(project.Swagger)'
4753
displayName: 'Set project: $(project.Swagger)'
4854
condition: startsWith(variables['Build.SourceBranch'], 'refs/tags/swagger-v')
@@ -51,9 +57,10 @@ steps:
5157
echo No project was specified.
5258
echo Build must be triggered with correct tag and based on the tag name, the project is selected.
5359
echo Available tag names and their projects:
60+
echo - appinsights-v* - Kros.ApplicationInsights.Extensions
5461
echo - aspnetcore-v* - Kros.AspNetCore
62+
echo - masstransitaz-v* - Kros.MassTransit.AzureServiceBus
5563
echo - mediatr-v* - Kros.MediatR.Extensions
56-
echo - appinsights-v* - Kros.ApplicationInsights.Extensions
5764
echo - swagger-v* - Kros.Swagger.Extensions
5865
exit 1
5966
displayName: 'Check project name'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using Microsoft.Azure.ServiceBus;
2+
using System.Collections.Generic;
3+
4+
namespace Kros.MassTransit.AzureServiceBus
5+
{
6+
/// <summary>
7+
/// Options for Azure service bus.
8+
/// </summary>
9+
public class AzureServiceBusOptions
10+
{
11+
/// <summary>
12+
/// Connection string for service bus.
13+
/// </summary>
14+
public string ConnectionString { get; set; }
15+
16+
/// <summary>
17+
/// Base service bus endpoint.
18+
/// </summary>
19+
public string Endpoint
20+
{
21+
get
22+
{
23+
string endpoint = new ServiceBusConnectionStringBuilder(ConnectionString).Endpoint;
24+
if (!string.IsNullOrEmpty(endpoint) && endpoint.EndsWith("/"))
25+
{
26+
return endpoint;
27+
}
28+
else
29+
{
30+
return endpoint + "/";
31+
}
32+
}
33+
}
34+
35+
/// <summary>
36+
/// Token time to live in seconds. If 0, default value <see cref=" ConfigDefaults.TokenTimeToLive"/> is used.
37+
/// </summary>
38+
public int TokenTimeToLive { get; set; }
39+
40+
/// <summary>
41+
/// Dictionary of supported service bus endpoints.
42+
/// </summary>
43+
public Dictionary<string, AzureServiceBusEndpoint> Endpoints { get; set; }
44+
45+
/// <summary>
46+
/// Information about service bus endpoint.
47+
/// </summary>
48+
public class AzureServiceBusEndpoint
49+
{
50+
/// <summary>
51+
/// Endpoint name.
52+
/// </summary>
53+
public string Name { get; set; }
54+
55+
/// <summary>
56+
/// Endpoint type.
57+
/// </summary>
58+
/// <remarks>Only for information purposes, can be empty.</remarks>
59+
public string Type { get; set; }
60+
}
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
using System;
2+
3+
namespace Kros.MassTransit.AzureServiceBus
4+
{
5+
/// <summary>
6+
/// Default configuration for Azure via Mass Transit.
7+
/// </summary>
8+
public static class ConfigDefaults
9+
{
10+
/// <summary>
11+
/// Default time to live for Azure service bus token.
12+
/// </summary>
13+
public static TimeSpan TokenTimeToLive { get; } = TimeSpan.FromMinutes(1);
14+
15+
/// <summary>
16+
/// Default message time to live in the queue.
17+
/// </summary>
18+
public static readonly TimeSpan MessageTimeToLive = TimeSpan.FromMinutes(3);
19+
20+
/// <summary>
21+
/// Default setting for move messages to the dead letter queue on expiration (time to live exceeded).
22+
/// </summary>
23+
public const bool EnableDeadLetteringOnMessageExpiration = true;
24+
25+
/// <summary>
26+
/// Default lock duration for messages read from the queue.
27+
/// </summary>
28+
public static readonly TimeSpan LockDuration = TimeSpan.FromSeconds(30);
29+
30+
/// <summary>
31+
/// Default value for if the queue should be deleted if idle.
32+
/// </summary>
33+
public static readonly TimeSpan AutoDeleteOnIdle = TimeSpan.FromMinutes(10);
34+
35+
/// <summary>
36+
/// Default maximum delivery count. A message is automatically deadlettered after this number of deliveries.
37+
/// </summary>
38+
public const int MaxDeliveryCount = 10;
39+
40+
/// <summary>
41+
/// Default time window for duplicate history.
42+
/// </summary>
43+
public static readonly TimeSpan DuplicateDetectionWindow = TimeSpan.FromMinutes(5);
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using Kros.Utils;
2+
using MassTransit;
3+
using MassTransit.Azure.ServiceBus.Core;
4+
using MassTransit.ConsumeConfigurators;
5+
using System;
6+
7+
namespace Kros.MassTransit.AzureServiceBus.Endpoints
8+
{
9+
/// <summary>
10+
/// Class representing Azure service bus endpoint.
11+
/// </summary>
12+
public abstract class Endpoint
13+
{
14+
protected Endpoint(string name)
15+
{
16+
Name = Check.NotNullOrWhiteSpace(name, nameof(name));
17+
}
18+
19+
/// <summary>
20+
/// Endpoint name.
21+
/// </summary>
22+
public string Name { get; }
23+
24+
/// <summary>
25+
/// Adds new consumer to endpoint.
26+
/// </summary>
27+
/// <typeparam name="TMessage">Type of message processed by consumer.</typeparam>
28+
/// <param name="handler">Delegate to process message.</param>
29+
public abstract void AddConsumer<TMessage>(MessageHandler<TMessage> handler) where TMessage : class;
30+
31+
/// <summary>
32+
/// Adds new consumer to endpoint.
33+
/// </summary>
34+
/// <typeparam name="TConsumer">Type of message consumer.</typeparam>
35+
/// <param name="configure">Delegate to configure consumer.</param>
36+
public abstract void AddConsumer<TConsumer>(Action<IConsumerConfigurator<TConsumer>> configure = null)
37+
where TConsumer : class, IConsumer;
38+
39+
/// <summary>
40+
/// Adds new consumer with dependencies to endpoint.
41+
/// </summary>
42+
/// <typeparam name="TConsumer">Type of message consumer.</typeparam>
43+
/// <param name="provider">Service provider (DI container).</param>
44+
/// <param name="configure">Delegate to configure consumer.</param>
45+
public abstract void AddConsumer<TConsumer>(
46+
IServiceProvider provider,
47+
Action<IConsumerConfigurator<TConsumer>> configure = null) where TConsumer : class, IConsumer;
48+
49+
/// <summary>
50+
/// Sets endpoint and its consumers during service bus initialization.
51+
/// </summary>
52+
/// <param name="busCfg">Service bus configuration.</param>
53+
/// <param name="host">Service bus host.</param>
54+
public abstract void SetEndpoint(IServiceBusBusFactoryConfigurator busCfg, IServiceBusHost host);
55+
}
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using MassTransit;
2+
using MassTransit.Azure.ServiceBus.Core;
3+
using MassTransit.ConsumeConfigurators;
4+
using System;
5+
using System.Collections.Generic;
6+
7+
namespace Kros.MassTransit.AzureServiceBus.Endpoints
8+
{
9+
/// <summary>
10+
/// Azure service bus receive endpoint.
11+
/// </summary>
12+
public class ReceiveEndpoint : Endpoint
13+
{
14+
private readonly Action<IServiceBusReceiveEndpointConfigurator> _configurator;
15+
private readonly List<Action<IServiceBusReceiveEndpointConfigurator>> _consumers;
16+
17+
/// <summary>
18+
/// Ctor.
19+
/// </summary>
20+
/// <param name="queueName">Name of queue.</param>
21+
/// <param name="configurator">Delegate to configure endpoint.</param>
22+
public ReceiveEndpoint(string queueName, Action<IServiceBusReceiveEndpointConfigurator> configurator)
23+
: base(queueName)
24+
{
25+
_configurator = configurator;
26+
_consumers = new List<Action<IServiceBusReceiveEndpointConfigurator>>();
27+
}
28+
29+
/// <inheritdoc />
30+
public override void AddConsumer<TMessage>(MessageHandler<TMessage> handler)
31+
=> _consumers.Add(endpointConfig => endpointConfig.Handler(handler));
32+
33+
/// <inheritdoc />
34+
public override void AddConsumer<TConsumer>(Action<IConsumerConfigurator<TConsumer>> configure = null)
35+
=> _consumers.Add(endpointConfig => endpointConfig.Consumer(() => Activator.CreateInstance<TConsumer>(), configure));
36+
37+
/// <inheritdoc />
38+
public override void AddConsumer<TConsumer>(
39+
IServiceProvider provider,
40+
Action<IConsumerConfigurator<TConsumer>> configure = null)
41+
=> _consumers.Add(endpointConfig => endpointConfig.Consumer(provider, configure));
42+
43+
/// <inheritdoc />
44+
public override void SetEndpoint(IServiceBusBusFactoryConfigurator busCfg, IServiceBusHost host)
45+
=> busCfg.ReceiveEndpoint(host, Name, endpointConfig =>
46+
{
47+
_configurator?.Invoke(endpointConfig);
48+
49+
foreach (Action<IServiceBusReceiveEndpointConfigurator> consumerConfig in _consumers)
50+
{
51+
consumerConfig?.Invoke(endpointConfig);
52+
}
53+
});
54+
}
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
using MassTransit;
2+
using MassTransit.Azure.ServiceBus.Core;
3+
using MassTransit.ConsumeConfigurators;
4+
using System;
5+
using System.Collections.Generic;
6+
7+
namespace Kros.MassTransit.AzureServiceBus.Endpoints
8+
{
9+
/// <summary>
10+
/// Azure service bus subscription endpoint.
11+
/// </summary>
12+
public class SubscriptionEndpoint<TMessage> : Endpoint where TMessage : class
13+
{
14+
private readonly Action<IServiceBusSubscriptionEndpointConfigurator> _configurator;
15+
private readonly List<Action<IServiceBusSubscriptionEndpointConfigurator>> _consumers;
16+
17+
/// <summary>
18+
/// Ctor.
19+
/// </summary>
20+
/// <param name="subscriptionName">Name of subscription.</param>
21+
/// <param name="configurator">Delegate to configure endpoint.</param>
22+
public SubscriptionEndpoint(string subscriptionName, Action<IServiceBusSubscriptionEndpointConfigurator> configurator)
23+
: base(subscriptionName)
24+
{
25+
_configurator = configurator;
26+
_consumers = new List<Action<IServiceBusSubscriptionEndpointConfigurator>>();
27+
}
28+
29+
/// <inheritdoc />
30+
public override void AddConsumer<TMessage2>(MessageHandler<TMessage2> handler)
31+
=> _consumers.Add(endpointConfig => endpointConfig.Handler(handler));
32+
33+
/// <inheritdoc />
34+
public override void AddConsumer<TConsumer>(Action<IConsumerConfigurator<TConsumer>> configure = null)
35+
=> _consumers.Add(endpointConfig => endpointConfig.Consumer(() => Activator.CreateInstance<TConsumer>(), configure));
36+
37+
/// <inheritdoc />
38+
public override void AddConsumer<TConsumer>(
39+
IServiceProvider provider,
40+
Action<IConsumerConfigurator<TConsumer>> configure = null)
41+
=> _consumers.Add(endpointConfig => endpointConfig.Consumer(provider, configure));
42+
43+
/// <inheritdoc />
44+
public override void SetEndpoint(IServiceBusBusFactoryConfigurator busCfg, IServiceBusHost host)
45+
=> busCfg.SubscriptionEndpoint<TMessage>(host, Name, endpointConfig =>
46+
{
47+
_configurator?.Invoke(endpointConfig);
48+
49+
foreach (Action<IServiceBusSubscriptionEndpointConfigurator> consumerConfig in _consumers)
50+
{
51+
consumerConfig?.Invoke(endpointConfig);
52+
}
53+
});
54+
}
55+
}

0 commit comments

Comments
 (0)