Skip to content

Commit 6ecfe7d

Browse files
committed
Introduce event publishing management for integration components and adjust Azure Service Bus customization to install the topics during provisioning
1 parent 22b7efd commit 6ecfe7d

19 files changed

Lines changed: 162 additions & 80 deletions

src/ServiceControl.Transports.ASBS/ASBSTransportCustomization.cs

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
namespace ServiceControl.Transports.ASBS
22
{
3+
using System.Collections.Generic;
34
using System.Linq;
45
using System.Text.Json;
6+
using System.Threading.Tasks;
7+
using Azure.Messaging.ServiceBus;
8+
using Azure.Messaging.ServiceBus.Administration;
59
using BrokerThroughput;
610
using Configuration;
711
using Microsoft.Extensions.DependencyInjection;
@@ -71,18 +75,7 @@ protected override void AddTransportForPrimaryCore(IServiceCollection services,
7175
{
7276
TopicToPublishTo = connectionSettings.TopicName,
7377
TopicToSubscribeOn = connectionSettings.TopicName,
74-
EventsToMigrateMap =
75-
[
76-
"ServiceControl.Contracts.CustomCheckFailed",
77-
"ServiceControl.Contracts.CustomCheckSucceeded",
78-
"ServiceControl.Contracts.HeartbeatRestored",
79-
"ServiceControl.Contracts.HeartbeatStopped",
80-
"ServiceControl.Contracts.FailedMessagesArchived",
81-
"ServiceControl.Contracts.FailedMessagesUnArchived",
82-
"ServiceControl.Contracts.MessageFailed",
83-
"ServiceControl.Contracts.MessageFailureResolvedByRetry",
84-
"ServiceControl.Contracts.MessageFailureResolvedManually"
85-
]
78+
EventsToMigrateMap = [.. transportSettings.EventTypesPublished.Select(t => t.FullName)]
8679
});
8780
}
8881
else if (SettingsReader.TryRead<string>(serviceBusRootNamespace, "Topology", out var topologyJson))
@@ -104,5 +97,47 @@ protected override void AddTransportForMonitoringCore(IServiceCollection service
10497
services.AddSingleton<IProvideQueueLength, QueueLengthProvider>();
10598
services.AddHostedService(provider => provider.GetRequiredService<IProvideQueueLength>());
10699
}
100+
101+
public override async Task ProvisionQueues(TransportSettings transportSettings, IEnumerable<string> additionalQueues)
102+
{
103+
await base.ProvisionQueues(transportSettings, additionalQueues);
104+
105+
if (transportSettings.EventTypesPublished.Count == 0)
106+
{
107+
return;
108+
}
109+
110+
var connectionSettings = ConnectionStringParser.Parse(transportSettings.ConnectionString);
111+
112+
var managementClient = connectionSettings.AuthenticationMethod.BuildManagementClient();
113+
114+
var creationTasks = new List<Task>(transportSettings.EventTypesPublished.Count);
115+
foreach (var publishedTopic in transportSettings.EventTypesPublished)
116+
{
117+
creationTasks.Add(CreateTopic(publishedTopic.FullName));
118+
}
119+
await Task.WhenAll(creationTasks);
120+
121+
async Task CreateTopic(string publishedTopic)
122+
{
123+
var topicToPublishTo = new CreateTopicOptions(connectionSettings.HierarchyNamespace != null
124+
? $"{connectionSettings.HierarchyNamespace}/{publishedTopic}"
125+
: publishedTopic)
126+
{
127+
EnableBatchedOperations = true,
128+
MaxSizeInMegabytes = 5 * 1024, // we are currently not configuring this in the connection string so it uses the same default as the transport
129+
EnablePartitioning = connectionSettings.EnablePartitioning,
130+
};
131+
132+
try
133+
{
134+
await managementClient.CreateTopicAsync(topicToPublishTo).ConfigureAwait(false);
135+
}
136+
catch (ServiceBusException sbe) when (sbe.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists || sbe.IsTransient)
137+
{
138+
// carry on
139+
}
140+
}
141+
}
107142
}
108143
}

src/ServiceControl.Transports.Tests/ApprovalFiles/APIApprovals.ServiceControlTransport.approved.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ namespace ServiceControl.Transports
9696
public string ConnectionString { get; set; }
9797
public string EndpointName { get; set; }
9898
public string ErrorQueue { get; set; }
99+
public System.Collections.Generic.IReadOnlySet<System.Type> EventTypesPublished { get; init; }
99100
public int? MaxConcurrency { get; set; }
100101
public bool RunCustomChecks { get; set; }
101102
public string TransportType { get; set; }

src/ServiceControl.Transports/TransportSettings.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace ServiceControl.Transports
22
{
33
using System;
4+
using System.Collections.Generic;
45
using System.Runtime.Loader;
56
using NServiceBus.Settings;
67

@@ -18,6 +19,8 @@ public class TransportSettings : SettingsHolder
1819

1920
public bool RunCustomChecks { get; set; }
2021

22+
public IReadOnlySet<Type> EventTypesPublished { get; init; } = new HashSet<Type>();
23+
2124
public string ErrorQueue
2225
{
2326
set;
Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
namespace Particular.ServiceControl
22
{
3+
using System;
34
using System.Collections.Generic;
45

5-
class ComponentInstallationContext : IComponentInstallationContext
6+
public class ComponentInstallationContext : IComponentInstallationContext
67
{
7-
public List<string> Queues { get; } = [];
8+
public IReadOnlyCollection<string> Queues => queuesToCreate;
9+
public IReadOnlySet<Type> EventTypesPublished => eventTypePublished;
810

9-
public void CreateQueue(string queueName) => Queues.Add(queueName);
11+
public void CreateQueue(string queueName) => queuesToCreate.Add(queueName);
12+
public void AddEventPublished<TEvent>() => eventTypePublished.Add(typeof(TEvent));
13+
14+
readonly List<string> queuesToCreate = [];
15+
readonly HashSet<Type> eventTypePublished = [];
1016
}
1117
}

src/ServiceControl/CustomChecks/CustomChecksComponent.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,28 @@
11
namespace ServiceControl.CustomChecks
22
{
33
using Connection;
4+
using Contracts;
45
using ExternalIntegrations;
56
using Microsoft.Extensions.DependencyInjection;
67
using Microsoft.Extensions.Hosting;
8+
using NServiceBus;
79
using Particular.ServiceControl;
810
using ServiceBus.Management.Infrastructure.Settings;
911
using Transports;
1012

1113
class CustomChecksComponent : ServiceControlComponent
1214
{
13-
public override void Configure(Settings settings, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
15+
public override void Setup(Settings settings, IComponentInstallationContext context, IHostApplicationBuilder hostBuilder)
16+
{
17+
// Integration Events
18+
if (!settings.DisableExternalIntegrationsPublishing)
19+
{
20+
context.AddEventPublished<CustomCheckFailed>();
21+
context.AddEventPublished<CustomCheckSucceeded>();
22+
}
23+
}
24+
25+
public override void Configure(Settings settings, EndpointConfiguration endpointConfiguration, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
1426
{
1527
hostBuilder.Services.AddIntegrationEventPublisher<CustomCheckFailedPublisher>();
1628
hostBuilder.Services.AddIntegrationEventPublisher<CustomCheckSucceededPublisher>();

src/ServiceControl/EventLog/EventLogComponent.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22
{
33
using Microsoft.Extensions.DependencyInjection;
44
using Microsoft.Extensions.Hosting;
5+
using NServiceBus;
56
using Particular.ServiceControl;
67
using ServiceBus.Management.Infrastructure.Settings;
78
using ServiceControl.Infrastructure.DomainEvents;
89
using Transports;
910

1011
class EventLogComponent : ServiceControlComponent
1112
{
12-
public override void Configure(Settings settings, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
13+
public override void Configure(Settings settings, EndpointConfiguration endpointConfiguration, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
1314
{
1415
var services = hostBuilder.Services;
1516
services.AddSingleton<EventLogMappings>();
Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,26 @@
11
namespace ServiceControl.ExternalIntegrations
22
{
3+
using Infrastructure.DomainEvents;
4+
using Microsoft.Extensions.DependencyInjection;
35
using Microsoft.Extensions.Hosting;
6+
using NServiceBus;
47
using Particular.ServiceControl;
58
using ServiceBus.Management.Infrastructure.Settings;
69
using Transports;
710

811
class ExternalIntegrationsComponent : ServiceControlComponent
912
{
10-
public override void Configure(Settings settings, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
13+
public override void Configure(Settings settings, EndpointConfiguration endpointConfiguration, ITransportCustomization transportCustomization, IHostApplicationBuilder hostBuilder)
1114
{
1215
var services = hostBuilder.Services;
1316
services.AddEventLogMapping<ExternalIntegrationEventFailedToBePublishedDefinition>();
17+
18+
if (!settings.DisableExternalIntegrationsPublishing)
19+
{
20+
services.AddHostedService<EventDispatcherHostedService>();
21+
services.AddDomainEventHandler<IntegrationEventWriter>();
22+
endpointConfiguration.EnableFeature<ExternalIntegrationsFeature>();
23+
}
1424
}
1525
}
1626
}

src/ServiceControl/ExternalIntegrations/ExternalIntegrationsHostBuilderExtensions.cs

Lines changed: 0 additions & 17 deletions
This file was deleted.

src/ServiceControl/HostApplicationBuilderExtensions.cs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ namespace Particular.ServiceControl
44
using System.Diagnostics;
55
using System.Runtime.InteropServices;
66
using global::ServiceControl.CustomChecks;
7-
using global::ServiceControl.ExternalIntegrations;
87
using global::ServiceControl.Hosting;
98
using global::ServiceControl.Infrastructure;
109
using global::ServiceControl.Infrastructure.BackgroundTasks;
@@ -45,8 +44,15 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
4544
hostBuilder.Logging.ClearProviders();
4645
hostBuilder.Logging.ConfigureLogging(settings.LoggingSettings.LogLevel);
4746

47+
var componentSetupContext = new ComponentInstallationContext();
48+
var serviceControlComponents = ServiceControlMainInstance.Components;
49+
foreach (ServiceControlComponent component in serviceControlComponents)
50+
{
51+
component.Setup(settings, componentSetupContext, hostBuilder);
52+
}
53+
4854
var services = hostBuilder.Services;
49-
var transportSettings = settings.ToTransportSettings();
55+
var transportSettings = settings.ToTransportSettings(componentSetupContext);
5056
var transportCustomization = TransportFactory.Create(transportSettings);
5157
transportCustomization.AddTransportForPrimary(services, transportSettings);
5258

@@ -78,14 +84,6 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
7884
services.AddPersistence(settings);
7985
services.AddMetrics(settings.PrintMetrics);
8086

81-
NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, configuration);
82-
hostBuilder.UseNServiceBus(configuration);
83-
84-
if (!settings.DisableExternalIntegrationsPublishing)
85-
{
86-
hostBuilder.AddExternalIntegrationEvents();
87-
}
88-
8987
hostBuilder.AddServicePulseSignalRNotifier();
9088
hostBuilder.AddEmailNotifications();
9189
hostBuilder.AddAsyncTimer();
@@ -101,7 +99,10 @@ public static void AddServiceControl(this IHostApplicationBuilder hostBuilder, S
10199
hostBuilder.AddWindowsServiceWithRequestTimeout();
102100
}
103101

104-
hostBuilder.AddServiceControlComponents(settings, transportCustomization, ServiceControlMainInstance.Components);
102+
hostBuilder.AddServiceControlComponents(componentSetupContext, settings, configuration, transportCustomization, serviceControlComponents);
103+
104+
NServiceBusFactory.Configure(settings, transportCustomization, transportSettings, configuration);
105+
hostBuilder.UseNServiceBus(configuration);
105106
}
106107

107108
public static void AddServiceControlInstallers(this IHostApplicationBuilder hostApplicationBuilder, Settings settings)

src/ServiceControl/Hosting/Commands/SetupCommand.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public override async Task Execute(HostArguments args, Settings settings)
4040
}
4141
else
4242
{
43-
var transportSettings = settings.ToTransportSettings();
43+
var transportSettings = settings.ToTransportSettings(componentSetupContext);
4444
transportSettings.RunCustomChecks = false;
4545
var transportCustomization = TransportFactory.Create(transportSettings);
4646

0 commit comments

Comments
 (0)