Skip to content

Commit c861da5

Browse files
committed
Add additional event that is triggered after each service broker receive operation
1 parent 93ac947 commit c861da5

13 files changed

Lines changed: 82 additions & 59 deletions

src/Dibix.Worker.Abstractions/BackgroundService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public abstract class BackgroundService : HostedServiceListener, IHostedService,
1111
private Task? _executingTask;
1212
private CancellationTokenSource? _stoppingCts;
1313

14-
protected BackgroundService(IHostedServiceRegistrar hostedServiceRegistrar, ILogger logger) : base(hostedServiceRegistrar, logger) { }
14+
protected BackgroundService(IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(hostedServiceEvents, logger) { }
1515

1616
protected sealed override Task StartListenerAsync(CancellationToken cancellationToken)
1717
{

src/Dibix.Worker.Abstractions/HostedServiceListener.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,16 @@ namespace Dibix.Worker.Abstractions
77
{
88
public abstract class HostedServiceListener : HostedService, IHostedService
99
{
10-
private readonly IHostedServiceRegistrar _hostedServiceRegistrar;
10+
private readonly IHostedServiceEvents _hostedServiceEvents;
1111

12-
protected HostedServiceListener(IHostedServiceRegistrar hostedServiceRegistrar, ILogger logger) : base(logger)
12+
protected HostedServiceListener(IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(logger)
1313
{
14-
_hostedServiceRegistrar = hostedServiceRegistrar;
14+
_hostedServiceEvents = hostedServiceEvents;
1515
}
1616

1717
protected sealed override async Task StartServiceAsync(CancellationToken cancellationToken)
1818
{
19-
await _hostedServiceRegistrar.RegisterHostedService(GetType().FullName, cancellationToken).ConfigureAwait(false);
19+
await _hostedServiceEvents.OnWorkerStarted(GetType().FullName, cancellationToken).ConfigureAwait(false);
2020
await StartListenerAsync(cancellationToken).ConfigureAwait(false);
2121
}
2222

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
4+
namespace Dibix.Worker.Abstractions
5+
{
6+
public interface IHostedServiceEvents
7+
{
8+
Task OnWorkerStarted(string fullName, CancellationToken cancellationToken);
9+
Task OnServiceBrokerIterationCompleted(string fullName, CancellationToken cancellationToken);
10+
}
11+
}

src/Dibix.Worker.Abstractions/IHostedServiceRegistrar.cs

Lines changed: 0 additions & 10 deletions
This file was deleted.

src/Dibix.Worker.Abstractions/IWorkerHostExtensionConfigurationBuilder.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ public interface IWorkerHostExtensionConfigurationBuilder
1212
IWorkerHostExtensionConfigurationBuilder ConfigureConnectionString(Func<string?, string?> configure);
1313
IWorkerHostExtensionConfigurationBuilder OnHostStarted(Func<IWorkerDependencyContext, Task> handler);
1414
IWorkerHostExtensionConfigurationBuilder OnHostStopped(Func<IWorkerDependencyContext, Task> handler);
15-
IWorkerHostExtensionConfigurationBuilder OnWorkerRegistered(OnWorkerRegistered handler);
15+
IWorkerHostExtensionConfigurationBuilder OnWorkerStarted(OnWorkerStarted handler);
16+
IWorkerHostExtensionConfigurationBuilder OnServiceBrokerIterationCompleted(OnServiceBrokerIterationCompleted handler);
1617
}
1718

18-
public delegate Task OnWorkerRegistered(string implementationName, IWorkerDependencyContext dependencyContext, CancellationToken cancellationToken);
19+
public delegate Task OnWorkerStarted(string implementationName, IWorkerDependencyContext dependencyContext, CancellationToken cancellationToken);
20+
21+
public delegate Task OnServiceBrokerIterationCompleted(string implementationName, IWorkerDependencyContext dependencyContext, CancellationToken cancellationToken);
1922
}

src/Dibix.Worker.Abstractions/ServiceBrokerSignalSubscriber.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Dibix.Worker.Abstractions
88
{
99
public abstract class ServiceBrokerSignalSubscriber : ServiceBrokerSubscriber<object>, IHostedService
1010
{
11-
protected ServiceBrokerSignalSubscriber(IWorkerScopeFactory scopeFactory, IHostedServiceRegistrar hostedServiceRegistrar, ILogger logger) : base(scopeFactory, hostedServiceRegistrar, logger) { }
11+
protected ServiceBrokerSignalSubscriber(IWorkerScopeFactory scopeFactory, IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(scopeFactory, hostedServiceEvents, logger) { }
1212

1313
private protected sealed override Task ProcessMessages(IEnumerable<object> messages) => OnMessage();
1414

src/Dibix.Worker.Abstractions/ServiceBrokerSubscriber.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,18 @@ public abstract class ServiceBrokerSubscriber<TMessage> : BackgroundService, IHo
1919
private const int ReceiveTimeout = CommandTimeout / 2 * 1000; // ms
2020
private const int RetryOnErrorDelay = 10000; // ms
2121
private readonly IWorkerScopeFactory _scopeFactory;
22+
private readonly IHostedServiceEvents _hostedServiceEvents;
2223
#endregion
2324

2425
#region Properties
2526
protected abstract string ReceiveProcedureName { get; }
2627
#endregion
2728

2829
#region Constructor
29-
protected ServiceBrokerSubscriber(IWorkerScopeFactory scopeFactory, IHostedServiceRegistrar hostedServiceRegistrar, ILogger logger) : base(hostedServiceRegistrar, logger)
30+
protected ServiceBrokerSubscriber(IWorkerScopeFactory scopeFactory, IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(hostedServiceEvents, logger)
3031
{
3132
_scopeFactory = scopeFactory;
33+
_hostedServiceEvents = hostedServiceEvents;
3234
}
3335
#endregion
3436

@@ -41,6 +43,7 @@ protected sealed override async Task ExecuteAsync(CancellationToken stoppingToke
4143
try
4244
{
4345
ICollection<TMessage> messages = await ReceiveMessageBatch(stoppingToken).ConfigureAwait(false);
46+
_ = _hostedServiceEvents.OnServiceBrokerIterationCompleted(GetType().FullName, stoppingToken);
4447
if (messages.Any())
4548
_ = ProcessMessages(messages);
4649
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using Dibix.Worker.Abstractions;
2+
3+
namespace Dibix.Worker.Host
4+
{
5+
public sealed class HostedServiceEventOptions
6+
{
7+
internal OnWorkerStarted? OnWorkerStarted { get; set; }
8+
internal OnServiceBrokerIterationCompleted? OnServiceBrokerIterationCompleted { get; set; }
9+
}
10+
}

src/Dibix.Worker.Host/Configuration/HostedServiceRegistrarOptions.cs

Lines changed: 0 additions & 9 deletions
This file was deleted.

src/Dibix.Worker.Host/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ private static async Task Main(string[] args)
3131
.AddScoped<IDatabaseAccessorFactory, ScopedDatabaseAccessorFactory>()
3232
.AddScoped<IWorkerDependencyContext, ServiceProviderWorkerDependencyContext>()
3333
.AddSingleton<IWorkerScopeFactory, ServiceScopeWorkerScopeFactory>()
34-
.AddSingleton<IHostedServiceRegistrar, DefaultHostedServiceRegistrar>()
34+
.AddSingleton<IHostedServiceEvents, HostedServiceEvents>()
3535
.AddHostedService<DatabaseOptionsMonitor>();
3636

3737
services.Configure<DatabaseOptions>(builder.Configuration.GetSection(DatabaseOptions.ConfigurationSectionName));

0 commit comments

Comments
 (0)