Skip to content

Commit 3479c35

Browse files
committed
Use generated accessor method for receive function in ServiceBrokerSubscriber
1 parent 2b72e09 commit 3479c35

16 files changed

Lines changed: 236 additions & 114 deletions

shared/Hosting/Data/ScopedConnectionDatabaseAccessorFactory.cs renamed to shared/Hosting/Data/ScopedDatabaseAccessorFactory.cs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using System;
2-
using System.Data.Common;
1+
using System.Data.Common;
32
using Dibix.Dapper;
43
using Microsoft.Extensions.Logging;
54

@@ -8,34 +7,38 @@ namespace Dibix.Hosting.Abstractions.Data
87
internal sealed class ScopedDatabaseAccessorFactory : IDatabaseAccessorFactory
98
{
109
private readonly IDatabaseConnectionResolver _connectionResolver;
11-
private readonly ILoggerFactory _loggerFactory;
10+
private readonly ILogger _logger;
1211

13-
public ScopedDatabaseAccessorFactory(IDatabaseConnectionResolver connectionResolver, ILoggerFactory loggerFactory)
12+
public ScopedDatabaseAccessorFactory(IDatabaseConnectionResolver connectionResolver, CreateDatabaseLogger loggerFactory)
1413
{
1514
_connectionResolver = connectionResolver;
16-
_loggerFactory = loggerFactory;
15+
_logger = loggerFactory();
1716
}
1817

1918
public IDatabaseAccessor Create()
2019
{
2120
DbConnection connection = _connectionResolver.Resolve();
22-
return new LoggingDapperDatabaseAccessor(connection, _loggerFactory, onDispose: () =>
23-
{
24-
// Disposal of the connection is responsibility of the consumer.
25-
// This is currently done by registering this as a scoped instance, that will be disposed after each request.
26-
});
21+
return new LoggingDapperDatabaseAccessor(connection, _logger);
2722
}
2823

2924
private sealed class LoggingDapperDatabaseAccessor : DapperDatabaseAccessor
3025
{
3126
private readonly ILogger _logger;
3227

33-
public LoggingDapperDatabaseAccessor(DbConnection connection, ILoggerFactory loggerFactory, Action onDispose) : base(connection, onDispose: onDispose)
28+
public LoggingDapperDatabaseAccessor(DbConnection connection, ILogger logger) : base(connection)
3429
{
35-
_logger = loggerFactory.CreateLogger("Dibix.Sql");
30+
_logger = logger;
3631
}
3732

38-
protected override void OnInfoMessage(string message) => _logger.LogDebug(message);
33+
protected override void OnInfoMessage(string message) => _logger.LogDebug($"[SQL] {message}");
34+
35+
protected override void DisposeConnection()
36+
{
37+
// Disposal of the connection is responsibility of the consumer.
38+
// This is currently done by registering this as a scoped instance, that will be disposed after each request.
39+
}
3940
}
4041
}
42+
43+
internal delegate ILogger CreateDatabaseLogger();
4144
}

src/Dibix.Dapper/DapperDatabaseAccessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public DynamicParametersWrapper(DynamicParameters impl, ParametersVisitor parame
196196

197197
void SqlMapper.IParameterCallbacks.OnCompleted()
198198
{
199-
_parametersVisitor.VisitOutputParameters(name => _impl.Get<object>(name));
199+
_parametersVisitor.VisitOutputParameters(_impl.Get<object>);
200200
_parameterCallbacks.OnCompleted();
201201
}
202202
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace Dibix.Worker.Abstractions
7+
{
8+
public interface IServiceBrokerMessageReader
9+
{
10+
Task<IEnumerable<TMessage>> ReadMessages<TMessage>(string fullSubscriberName, Func<IDatabaseAccessorFactory, int, CancellationToken, Task<IEnumerable<TMessage>>> handler, CancellationToken cancellationToken);
11+
}
12+
}

src/Dibix.Worker.Abstractions/IWorkerDependencyContext.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ public interface IWorkerDependencyContext
88
{
99
DbConnection Connection { get; }
1010
IDatabaseAccessorFactory DatabaseAccessorFactory { get; }
11-
11+
string InitiatorFullName { get; }
12+
1213
T GetExtension<T>() where T : notnull;
1314
T GetExtension<T>(Type implementationType) where T : notnull;
1415
ILogger CreateLogger(Type loggerType);

src/Dibix.Worker.Abstractions/IWorkerHostExtensionConfigurationBuilder.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public interface IWorkerHostExtensionConfigurationBuilder
1616
IWorkerHostExtensionConfigurationBuilder OnServiceBrokerIterationCompleted(OnServiceBrokerIterationCompleted handler);
1717
}
1818

19-
public delegate Task OnWorkerStarted(string implementationName, IWorkerDependencyContext dependencyContext, CancellationToken cancellationToken);
19+
public delegate Task OnWorkerStarted(IWorkerDependencyContext dependencyContext, CancellationToken cancellationToken);
2020

21-
public delegate Task OnServiceBrokerIterationCompleted(string implementationName, IWorkerDependencyContext dependencyContext, CancellationToken cancellationToken);
21+
public delegate Task OnServiceBrokerIterationCompleted(IWorkerDependencyContext dependencyContext, CancellationToken cancellationToken);
2222
}

src/Dibix.Worker.Abstractions/IWorkerScopeFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
{
33
public interface IWorkerScopeFactory
44
{
5-
IWorkerScope Create();
5+
IWorkerScope Create<TInitiator>();
6+
IWorkerScope Create(string initiatorFullName);
67
}
78
}

src/Dibix.Worker.Abstractions/ServiceBrokerSignalSubscriber.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace Dibix.Worker.Abstractions
88
{
99
public abstract class ServiceBrokerSignalSubscriber : ServiceBrokerSubscriber<object>, IHostedService
1010
{
11-
protected ServiceBrokerSignalSubscriber(IWorkerScopeFactory scopeFactory, IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(scopeFactory, hostedServiceEvents, logger) { }
11+
protected ServiceBrokerSignalSubscriber(IServiceBrokerMessageReader serviceBrokerMessageReader, IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(serviceBrokerMessageReader, hostedServiceEvents, logger) { }
1212

1313
private protected sealed override Task ProcessMessages(IEnumerable<object> messages) => OnMessage();
1414

src/Dibix.Worker.Abstractions/ServiceBrokerSubscriber.cs

Lines changed: 9 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Data;
4-
using System.Data.Common;
5-
using System.Data.SqlClient;
63
using System.Linq;
74
using System.Threading;
85
using System.Threading.Tasks;
9-
using Dapper;
106
using Microsoft.Extensions.Hosting;
117
using Microsoft.Extensions.Logging;
128

@@ -15,22 +11,18 @@ namespace Dibix.Worker.Abstractions
1511
public abstract class ServiceBrokerSubscriber<TMessage> : BackgroundService, IHostedService
1612
{
1713
#region Fields
18-
private const int CommandTimeout = 60; // seconds
19-
private const int ReceiveTimeout = CommandTimeout / 2 * 1000; // ms
20-
private const int RetryOnErrorDelay = 10000; // ms
21-
private readonly IWorkerScopeFactory _scopeFactory;
14+
private const int RetryOnErrorDelay = 10000; // ms
15+
private readonly IServiceBrokerMessageReader _serviceBrokerMessageReader;
2216
private readonly IHostedServiceEvents _hostedServiceEvents;
23-
#endregion
24-
25-
#region Properties
26-
protected abstract string ReceiveProcedureName { get; }
17+
private readonly string _fullSubscriberName;
2718
#endregion
2819

2920
#region Constructor
30-
protected ServiceBrokerSubscriber(IWorkerScopeFactory scopeFactory, IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(hostedServiceEvents, logger)
21+
protected ServiceBrokerSubscriber(IServiceBrokerMessageReader serviceBrokerMessageReader, IHostedServiceEvents hostedServiceEvents, ILogger logger) : base(hostedServiceEvents, logger)
3122
{
32-
_scopeFactory = scopeFactory;
23+
_serviceBrokerMessageReader = serviceBrokerMessageReader;
3324
_hostedServiceEvents = hostedServiceEvents;
25+
_fullSubscriberName = GetType().FullName;
3426
}
3527
#endregion
3628

@@ -66,12 +58,9 @@ protected sealed override async Task ExecuteAsync(CancellationToken stoppingToke
6658
}
6759
}
6860

69-
protected abstract Task ProcessMessage(TMessage message);
61+
protected abstract Task<IEnumerable<TMessage>> ReceiveMessages(IDatabaseAccessorFactory databaseAccessorFactory, int receiveTimeout, CancellationToken cancellationToken);
7062

71-
protected virtual void CollectParameters(IParameterCollectionContext context)
72-
{
73-
context.AddParameter("timeout", DbType.Int32, ReceiveTimeout);
74-
}
63+
protected abstract Task ProcessMessage(TMessage message);
7564

7665
private protected virtual async Task ProcessMessages(IEnumerable<TMessage> messages)
7766
{
@@ -92,71 +81,7 @@ private protected virtual async Task ProcessMessages(IEnumerable<TMessage> messa
9281
#region Private Methods
9382
private async Task<ICollection<TMessage>> ReceiveMessageBatch(CancellationToken cancellationToken)
9483
{
95-
using IWorkerScope scope = _scopeFactory.Create();
96-
97-
if (scope.Connection is SqlConnection sqlConnection)
98-
{
99-
void OnInfoMessage(object _, SqlInfoMessageEventArgs e) => Logger.LogDebug($"[SQL] {e.Message}");
100-
sqlConnection.InfoMessage += OnInfoMessage;
101-
}
102-
103-
using DbCommand command = scope.Connection.CreateCommand();
104-
command.CommandText = ReceiveProcedureName;
105-
command.CommandType = CommandType.StoredProcedure;
106-
command.CommandTimeout = CommandTimeout;
107-
108-
IParameterCollectionContext parameterCollectionContext = new ParameterCollectionContext(command);
109-
CollectParameters(parameterCollectionContext);
110-
111-
using (EnterCancellationScope(cancellationToken, command))
112-
{
113-
// Normally we would use ExecuteReaderAsync(cancellationToken) here,
114-
// but our receive procedures are using RAISERROR WITH NOWAIT to report progress in realtime.
115-
// The usage of NOWAIT however seems to block the cancellation.
116-
// Therefore we have to use the sync method and cancel the command ourselves.
117-
// See: https://stackoverflow.com/questions/24738417/canceling-sql-server-query-with-cancellationtoken/24834029#24834029
118-
using (IDataReader reader = await Task.Run(command.ExecuteReader, cancellationToken).ConfigureAwait(false))
119-
{
120-
return reader.Parse<TMessage>().ToArray();
121-
}
122-
}
123-
}
124-
125-
private IDisposable EnterCancellationScope(CancellationToken cancellationToken, IDbCommand command)
126-
{
127-
return cancellationToken.Register(() => HandleCancellationRequest(command));
128-
}
129-
130-
private void HandleCancellationRequest(IDbCommand command)
131-
{
132-
Logger.LogDebug("Cancelling current service broker queue read operation");
133-
command?.Cancel(); // this method throws a SqlException => catch needed!
134-
}
135-
#endregion
136-
137-
#region Nested Types
138-
private sealed class ParameterCollectionContext : IParameterCollectionContext
139-
{
140-
private readonly IDbCommand _command;
141-
142-
public ParameterCollectionContext(IDbCommand command)
143-
{
144-
_command = command;
145-
}
146-
147-
public IParameterCollectionContext AddParameter(string parameterName, DbType parameterType, object value, int? size = null)
148-
{
149-
IDbDataParameter param = _command.CreateParameter();
150-
param.ParameterName = parameterName;
151-
param.DbType = parameterType;
152-
param.Value = value;
153-
154-
if (size != null)
155-
param.Size = size.Value;
156-
157-
_command.Parameters.Add(param);
158-
return this;
159-
}
84+
return (await _serviceBrokerMessageReader.ReadMessages(_fullSubscriberName, ReceiveMessages, cancellationToken).ConfigureAwait(false)).ToArray();
16085
}
16186
#endregion
16287
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Data;
4+
using System.Data.Common;
5+
using System.Linq;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Dapper;
9+
using Dibix.Hosting.Abstractions.Data;
10+
using Dibix.Worker.Abstractions;
11+
using Microsoft.Extensions.Logging;
12+
using static System.Formats.Asn1.AsnWriter;
13+
14+
namespace Dibix.Worker.Host
15+
{
16+
internal sealed class ServiceBrokerDapperDatabaseAccessorFactory : IDatabaseAccessorFactory
17+
{
18+
private readonly IDatabaseConnectionResolver _connectionResolver;
19+
private readonly ILogger _logger;
20+
21+
public ServiceBrokerDapperDatabaseAccessorFactory(IDatabaseConnectionResolver connectionResolver, CreateDatabaseLogger loggerFactory)
22+
{
23+
_connectionResolver = connectionResolver;
24+
_logger = loggerFactory();
25+
}
26+
27+
public IDatabaseAccessor Create()
28+
{
29+
DbConnection connection = _connectionResolver.Resolve();
30+
return new ServiceBrokerDapperDatabaseAccessor(connection, _logger);
31+
}
32+
}
33+
34+
internal sealed class ServiceBrokerDapperDatabaseAccessor : DatabaseAccessor
35+
{
36+
private readonly ILogger _logger;
37+
38+
public ServiceBrokerDapperDatabaseAccessor(DbConnection connection, ILogger logger) : base(connection)
39+
{
40+
_logger = logger;
41+
}
42+
43+
protected override int Execute(string commandText, CommandType commandType, ParametersVisitor parameters, int? commandTimeout) => throw new NotImplementedException();
44+
45+
protected override Task<int> ExecuteAsync(string commandText, CommandType commandType, ParametersVisitor parameters, int? commandTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
46+
47+
protected override IEnumerable<T> QueryMany<T>(string commandText, CommandType commandType, ParametersVisitor parameters) => throw new NotImplementedException();
48+
49+
protected override IEnumerable<T> QueryMany<T>(string commandText, CommandType commandType, ParametersVisitor parameters, bool buffered) => throw new NotImplementedException();
50+
51+
protected override async Task<IEnumerable<T>> QueryManyAsync<T>(string commandText, CommandType commandType, ParametersVisitor parameters, bool buffered, CancellationToken cancellationToken)
52+
{
53+
using DbCommand command = Connection.CreateCommand();
54+
command.CommandText = commandText;
55+
command.CommandType = commandType;
56+
command.CommandTimeout = ServiceBrokerDefaults.CommandTimeout;
57+
58+
parameters.VisitInputParameters((name, type, value, isOutput, customInputType) => CollectInputParameter(command, name, type, value, isOutput, customInputType));
59+
60+
using (EnterCancellationScope(cancellationToken, command))
61+
{
62+
// Normally we would use ExecuteReaderAsync(cancellationToken) here,
63+
// but our receive procedures are using RAISERROR WITH NOWAIT to report progress in realtime.
64+
// The usage of NOWAIT however seems to block the cancellation.
65+
// Therefore we have to use the sync method and cancel the command ourselves.
66+
// See: https://stackoverflow.com/questions/24738417/canceling-sql-server-query-with-cancellationtoken/24834029#24834029
67+
ICollection<T> result;
68+
using (IDataReader reader = await Task.Run(command.ExecuteReader, cancellationToken).ConfigureAwait(false))
69+
{
70+
result = reader.Parse<T>().ToArray();
71+
}
72+
parameters.VisitOutputParameters(name => CollectOutputParameter(command, name));
73+
return result;
74+
}
75+
}
76+
77+
protected override IEnumerable<TReturn> QueryMany<TReturn>(string commandText, CommandType commandType, ParametersVisitor parameters, Type[] types, Func<object[], TReturn> map, string splitOn) => throw new NotImplementedException();
78+
79+
protected override T QuerySingleOrDefault<T>(string commandText, CommandType commandType, ParametersVisitor parameters) => throw new NotImplementedException();
80+
81+
protected override Task<T> QuerySingleOrDefaultAsync<T>(string commandText, CommandType commandType, ParametersVisitor parameters, CancellationToken cancellationToken) => throw new NotImplementedException();
82+
83+
protected override IMultipleResultReader QueryMultiple(string commandText, CommandType commandType, ParametersVisitor parameters) => throw new NotImplementedException();
84+
85+
protected override Task<IMultipleResultReader> QueryMultipleAsync(string commandText, CommandType commandType, ParametersVisitor parameters, CancellationToken cancellationToken) => throw new NotImplementedException();
86+
87+
protected override void OnInfoMessage(string message) => _logger.LogDebug($"[SQL] {message}");
88+
89+
protected override void DisposeConnection()
90+
{
91+
// Disposal of the connection is responsibility of the consumer.
92+
// This is currently done by registering this as a scoped instance, that will be disposed after each request.
93+
}
94+
95+
private IDisposable EnterCancellationScope(CancellationToken cancellationToken, IDbCommand command)
96+
{
97+
return cancellationToken.Register(() => HandleCancellationRequest(command));
98+
}
99+
100+
private void HandleCancellationRequest(IDbCommand command)
101+
{
102+
_logger.LogDebug("Cancelling current service broker queue read operation");
103+
command?.Cancel(); // this method throws a SqlException => catch needed!
104+
}
105+
106+
private static void CollectInputParameter(DbCommand command, string name, DbType type, object value, bool isOutput, CustomInputType customInputType)
107+
{
108+
DbParameter parameter = command.CreateParameter();
109+
parameter.ParameterName = name;
110+
parameter.DbType = type;
111+
parameter.Value = value;
112+
parameter.Direction = isOutput ? ParameterDirection.Output : ParameterDirection.Input;
113+
command.Parameters.Add(parameter);
114+
}
115+
116+
private static object? CollectOutputParameter(DbCommand command, string name) => command.Parameters[name].Value;
117+
}
118+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace Dibix.Worker.Host
2+
{
3+
internal static class ServiceBrokerDefaults
4+
{
5+
public const int CommandTimeout = 60; // seconds
6+
public const int ReceiveTimeout = CommandTimeout / 2 * 1000; // ms
7+
}
8+
}

0 commit comments

Comments
 (0)