Skip to content

Commit edd2994

Browse files
committed
Introduce event publishing management for integration components and adjust Azure Service Bus customization to install the topics during provisioning
1 parent 22b7efd commit edd2994

14 files changed

Lines changed: 140 additions & 62 deletions

File tree

src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
namespace ServiceControl.Transports.ASBS
22
{
3+
using System.Collections.Generic;
34
using System.Linq;
45
using System.Text.Json;
6+
using System.Threading.Tasks;
7+
using Azure.Messaging.ServiceBus;
8+
using Azure.Messaging.ServiceBus.Administration;
59
using BrokerThroughput;
610
using Configuration;
711
using Microsoft.Extensions.DependencyInjection;
@@ -71,18 +75,7 @@ protected override void AddTransportForPrimaryCore(IServiceCollection services,
7175
{
7276
TopicToPublishTo = connectionSettings.TopicName,
7377
TopicToSubscribeOn = connectionSettings.TopicName,
74-
EventsToMigrateMap =
75-
[
76-
"ServiceControl.Contracts.CustomCheckFailed",
77-
"ServiceControl.Contracts.CustomCheckSucceeded",
78-
"ServiceControl.Contracts.HeartbeatRestored",
79-
"ServiceControl.Contracts.HeartbeatStopped",
80-
"ServiceControl.Contracts.FailedMessagesArchived",
81-
"ServiceControl.Contracts.FailedMessagesUnArchived",
82-
"ServiceControl.Contracts.MessageFailed",
83-
"ServiceControl.Contracts.MessageFailureResolvedByRetry",
84-
"ServiceControl.Contracts.MessageFailureResolvedManually"
85-
]
78+
EventsToMigrateMap = [.. transportSettings.EventTypesPublished.Select(t => t.FullName)]
8679
});
8780
}
8881
else if (SettingsReader.TryRead<string>(serviceBusRootNamespace, "Topology", out var topologyJson))
@@ -104,5 +97,47 @@ protected override void AddTransportForMonitoringCore(IServiceCollection service
10497
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
10598
services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
10699
}
100+
101+
public override async Task ProvisionQueues(TransportSettings transportSettings, IEnumerable<string> additionalQueues)
102+
{
103+
await base.ProvisionQueues(transportSettings, additionalQueues);
104+
105+
if (transportSettings.EventTypesPublished.Count == 0)
106+
{
107+
return;
108+
}
109+
110+
var connectionSettings = ConnectionStringParser.Parse(transportSettings.ConnectionString);
111+
112+
var managementClient = connectionSettings.AuthenticationMethod.BuildManagementClient();
113+
114+
var creationTasks = new List<Task>(transportSettings.EventTypesPublished.Count);
115+
foreach (var publishedTopic in transportSettings.EventTypesPublished)
116+
{
117+
creationTasks.Add(CreateTopic(publishedTopic.FullName));
118+
}
119+
await Task.WhenAll(creationTasks);
120+
121+
async Task CreateTopic(string publishedTopic)
122+
{
123+
var topicToPublishTo = new CreateTopicOptions(connectionSettings.HierarchyNamespace != null
124+
? $"{connectionSettings.HierarchyNamespace}/{publishedTopic}"
125+
: publishedTopic)
126+
{
127+
EnableBatchedOperations = true,
128+
MaxSizeInMegabytes = 5 * 1024, // we are currently not configuring this in the connection string so it uses the same default as the transport
129+
EnablePartitioning = connectionSettings.EnablePartitioning,
130+
};
131+
132+
try
133+
{
134+
await managementClient.CreateTopicAsync(topicToPublishTo).ConfigureAwait(false);
135+
}
136+
catch (ServiceBusException sbe) when (sbe.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists || sbe.IsTransient)
137+
{
138+
// carry on
139+
}
140+
}
141+
}
107142
}
108143
}

src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ namespace ServiceControl.Transports
9696
public string ConnectionString { get; set; }
9797
public string EndpointName { get; set; }
9898
public string ErrorQueue { get; set; }
99+
public System.Collections.Generic.IReadOnlySet<System.Type> EventTypesPublished { get; init; }
99100
public int? MaxConcurrency { get; set; }
100101
public bool RunCustomChecks { get; set; }
101102
public string TransportType { get; set; }

src/ServiceControl.Transports/TransportSettings.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.Transports
22
{
33
using System;
4+
using System.Collections.Generic;
45
using System.Runtime.Loader;
56
using NServiceBus.Settings;
67

@@ -18,6 +19,8 @@ public class TransportSettings : SettingsHolder
1819

1920
public bool RunCustomChecks { get; set; }
2021

22+
public IReadOnlySet<Type> EventTypesPublished { get; init; } = new HashSet<Type>();
23+
2124
public string ErrorQueue
2225
{
2326
set;
Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
namespace Particular.ServiceControl
22
{
3+
using System;
34
using System.Collections.Generic;
45

5-
class ComponentInstallationContext : IComponentInstallationContext
6+
public class ComponentInstallationContext : IComponentInstallationContext
67
{
7-
public List<string> Queues { get; } = [];
8+
public IReadOnlyCollection<string> Queues => queuesToCreate;
9+
public IReadOnlySet<Type> EventTypesPublished => eventTypePublished;
810

9-
public void CreateQueue(string queueName) => Queues.Add(queueName);
11+
public void CreateQueue(string queueName) => queuesToCreate.Add(queueName);
12+
public void AddEventPublished<TEvent>() => eventTypePublished.Add(typeof(TEvent));
13+
14+
readonly List<string> queuesToCreate = [];
15+
readonly HashSet<Type> eventTypePublished = [];
1016
}
1117
}

src/ServiceControl/CustomChecks/CustomChecksComponent.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.CustomChecks
22
{
33
using Connection;
4+
using Contracts;
45
using ExternalIntegrations;
56
using Microsoft.Extensions.DependencyInjection;
67
using Microsoft.Extensions.Hosting;
@@ -10,6 +11,16 @@
1011

1112
class CustomChecksComponent : ServiceControlComponent
1213
{
14+
public override void Setup(Settings settings, IComponentInstallationContext context, IHostApplicationBuilder hostBuilder)
15+
{
16+
// Integration Events
17+
if (!settings.DisableExternalIntegrationsPublishing)
18+
{
19+
context.AddEventPublished<CustomCheckFailed>();
20+
context.AddEventPublished<CustomCheckSucceeded>();
21+
}
22+
}
23+
1324
public override void Configure(Settings settings, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
1425
{
1526
hostBuilder.Services.AddIntegrationEventPublisher<CustomCheckFailedPublisher>();

src/ServiceControl/ExternalIntegrations/ExternalIntegrationsComponent.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
namespace ServiceControl.ExternalIntegrations
22
{
3+
using Infrastructure.DomainEvents;
4+
using Microsoft.Extensions.DependencyInjection;
35
using Microsoft.Extensions.Hosting;
46
using Particular.ServiceControl;
57
using ServiceBus.Management.Infrastructure.Settings;
@@ -11,6 +13,12 @@ public override void Configure(Settings settings, ITransportCustomization transp
1113
{
1214
var services = hostBuilder.Services;
1315
services.AddEventLogMapping<ExternalIntegrationEventFailedToBePublishedDefinition>();
16+
17+
if (!settings.DisableExternalIntegrationsPublishing)
18+
{
19+
services.AddHostedService<EventDispatcherHostedService>();
20+
services.AddDomainEventHandler<IntegrationEventWriter>();
21+
}
1422
}
1523
}
1624
}

src/ServiceControl/ExternalIntegrations/ExternalIntegrationsHostBuilderExtensions.cs

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/ServiceControl/HostApplicationBuilderExtensions.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ namespace Particular.ServiceControl
44
using System.Diagnostics;
55
using System.Runtime.InteropServices;
66
using global::ServiceControl.CustomChecks;
7-
using global::ServiceControl.ExternalIntegrations;
87
using global::ServiceControl.Hosting;
98
using global::ServiceControl.Infrastructure;
109
using global::ServiceControl.Infrastructure.BackgroundTasks;
@@ -45,8 +44,15 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
4544
hostBuilder.Logging.ClearProviders();
4645
hostBuilder.Logging.ConfigureLogging(settings.LoggingSettings.LogLevel);
4746

47+
var componentSetupContext = new ComponentInstallationContext();
48+
var serviceControlComponents = ServiceControlMainInstance.Components;
49+
foreach (ServiceControlComponent component in serviceControlComponents)
50+
{
51+
component.Setup(settings, componentSetupContext, hostBuilder);
52+
}
53+
4854
var services = hostBuilder.Services;
49-
var transportSettings = settings.ToTransportSettings();
55+
var transportSettings = settings.ToTransportSettings(componentSetupContext);
5056
var transportCustomization = TransportFactory.Create(transportSettings);
5157
transportCustomization.AddTransportForPrimary(services, transportSettings);
5258

@@ -81,11 +87,6 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
8187
NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, configuration);
8288
hostBuilder.UseNServiceBus(configuration);
8389

84-
if (!settings.DisableExternalIntegrationsPublishing)
85-
{
86-
hostBuilder.AddExternalIntegrationEvents();
87-
}
88-
8990
hostBuilder.AddServicePulseSignalRNotifier();
9091
hostBuilder.AddEmailNotifications();
9192
hostBuilder.AddAsyncTimer();
@@ -101,7 +102,7 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
101102
hostBuilder.AddWindowsServiceWithRequestTimeout();
102103
}
103104

104-
hostBuilder.AddServiceControlComponents(settings, transportCustomization, ServiceControlMainInstance.Components);
105+
hostBuilder.AddServiceControlComponents(componentSetupContext, settings, transportCustomization, serviceControlComponents);
105106
}
106107

107108
public static void AddServiceControlInstallers(this IHostApplicationBuilder hostApplicationBuilder, Settings settings)

src/ServiceControl/Hosting/Commands/SetupCommand.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public override async Task Execute(HostArguments args, Settings settings)
4040
}
4141
else
4242
{
43-
var transportSettings = settings.ToTransportSettings();
43+
var transportSettings = settings.ToTransportSettings(componentSetupContext);
4444
transportSettings.RunCustomChecks = false;
4545
var transportCustomization = TransportFactory.Create(transportSettings);
4646

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
namespace Particular.ServiceControl
22
{
3-
interface IComponentInstallationContext
3+
public interface IComponentInstallationContext
44
{
55
void CreateQueue(string queueName);
6+
7+
void AddEventPublished<TEvent>();
68
}
79
}

0 commit comments

Comments
 (0)