Skip to content

Commit c072cf8

Browse files
committed
Don't use Dapper in the SqlServerJobQueue class
1 parent 6832549 commit c072cf8

File tree

1 file changed

+71
-39
lines changed

1 file changed

+71
-39
lines changed

Diff for: src/Hangfire.SqlServer/SqlServerJobQueue.cs

+71-39
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
using System.Globalization;
2222
using System.Linq;
2323
using System.Threading;
24-
using Dapper;
2524
using Hangfire.Annotations;
2625
using Hangfire.Storage;
2726

@@ -45,6 +44,8 @@ internal sealed class SqlServerJobQueue : IPersistentJobQueue
4544
new ConcurrentDictionary<Tuple<SqlServerStorage, string>, SemaphoreSlim>();
4645
private static readonly ConcurrentDictionary<KeyValuePair<SqlServerStorage, int>, string> NonBlockingQueriesCache =
4746
new ConcurrentDictionary<KeyValuePair<SqlServerStorage, int>, string>();
47+
private static readonly ConcurrentDictionary<KeyValuePair<SqlServerStorage, int>, string> TransactionalQueriesCache =
48+
new ConcurrentDictionary<KeyValuePair<SqlServerStorage, int>, string>();
4849

4950
private readonly SqlServerStorage _storage;
5051
private readonly SqlServerStorageOptions _options;
@@ -81,13 +82,15 @@ public void Enqueue(DbConnection connection, DbTransaction transaction, string q
8182
var query = _storage.GetQueryFromTemplate(static schemaName =>
8283
$@"insert into [{schemaName}].JobQueue (JobId, Queue) values (@jobId, @queue)");
8384

84-
connection.Execute(
85-
query,
86-
new { jobId = long.Parse(jobId, CultureInfo.InvariantCulture), queue = queue }
85+
var command = ((DbConnection)connection).Create(query, timeout: _storage.CommandTimeout);
86+
command.AddParameter("@jobId", long.Parse(jobId, CultureInfo.InvariantCulture), DbType.Int64);
87+
command.AddParameter("@queue", queue, DbType.String);
88+
8789
#if !FEATURE_TRANSACTIONSCOPE
88-
, transaction
90+
command.Transaction = transaction;
8991
#endif
90-
, commandTimeout: _storage.CommandTimeout);
92+
93+
command.ExecuteNonQuery();
9194
}
9295

9396
private SqlServerTimeoutJob DequeueUsingSlidingInvisibilityTimeout(string[] queues, CancellationToken cancellationToken)
@@ -224,22 +227,15 @@ where Queue in @queues and
224227
for (var i = 0; i < queues.Length; i++)
225228
{
226229
command.AddParameter("@queue" + i, queues[i], DbType.String);
227-
};
230+
}
228231

229232
return command;
230233
}
231234

232235
private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, CancellationToken cancellationToken)
233236
{
234-
FetchedJob fetchedJob = null;
235237
DbTransaction transaction = null;
236238

237-
var query = _storage.GetQueryFromTemplate(static schemaName =>
238-
$@"delete top (1) JQ
239-
output DELETED.Id, DELETED.JobId, DELETED.Queue
240-
from [{schemaName}].JobQueue JQ with (readpast, updlock, rowlock, forceseek)
241-
where Queue in @queues and (FetchedAt is null or FetchedAt < DATEADD(second, @timeout, GETUTCDATE()))");
242-
243239
var pollInterval = _options.QueuePollInterval > TimeSpan.Zero
244240
? _options.QueuePollInterval
245241
: TimeSpan.FromSeconds(1);
@@ -254,28 +250,37 @@ private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, Cancell
254250
{
255251
transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
256252

257-
fetchedJob = connection.QuerySingleOrDefault<FetchedJob>(
258-
query,
259253
#pragma warning disable 618
260-
new { queues = queues, timeout = _options.InvisibilityTimeout.Negate().TotalSeconds },
254+
using var command = CreateTransactionalFetchCommand(_storage, connection, queues, (int)_options.InvisibilityTimeout.Negate().TotalSeconds);
261255
#pragma warning restore 618
262-
transaction,
263-
commandTimeout: _storage.CommandTimeout);
256+
command.Transaction = transaction;
264257

265-
if (fetchedJob != null)
258+
using (var reader = command.ExecuteReader())
266259
{
267-
return new SqlServerTransactionJob(
268-
_storage,
269-
connection,
270-
transaction,
271-
fetchedJob.JobId.ToString(CultureInfo.InvariantCulture),
272-
fetchedJob.Queue);
273-
}
274-
else
275-
{
276-
// Nothing updated, just commit the empty transaction.
277-
transaction.Commit();
260+
if (reader.Read())
261+
{
262+
var jobId = reader.GetInt64(reader.GetOrdinal("JobId"));
263+
var queue = reader.GetString(reader.GetOrdinal("Queue"));
264+
265+
if (reader.Read())
266+
{
267+
throw new InvalidOperationException(
268+
"Multiple rows returned from SQL Server, while expecting single or none.");
269+
}
270+
271+
var result = new SqlServerTransactionJob(_storage, connection, transaction,
272+
jobId.ToString(CultureInfo.InvariantCulture), queue);
273+
274+
// We shouldn't dispose them, because their ownership is now related
275+
// to the SqlServerTransactionJob instance.
276+
connection = null;
277+
transaction = null;
278+
return result;
279+
}
278280
}
281+
282+
// Nothing updated, just commit the empty transaction.
283+
transaction.Commit();
279284
}
280285
catch (Exception ex) when (ex.IsCatchableExceptionType())
281286
{
@@ -289,22 +294,49 @@ private SqlServerTransactionJob DequeueUsingTransaction(string[] queues, Cancell
289294
}
290295
finally
291296
{
292-
if (fetchedJob == null)
293-
{
294-
transaction?.Dispose();
295-
transaction = null;
296-
297-
_storage.ReleaseConnection(connection);
298-
}
297+
transaction?.Dispose();
298+
_storage.ReleaseConnection(connection);
299299
}
300300

301301
WaitHandle.WaitAny(waitArray, pollInterval);
302302
}
303-
303+
304304
cancellationToken.ThrowIfCancellationRequested();
305305
return null;
306306
}
307307

308+
private static DbCommand CreateTransactionalFetchCommand(
309+
SqlServerStorage storage,
310+
DbConnection connection,
311+
string[] queues,
312+
int invisibilityTimeout)
313+
{
314+
var query = TransactionalQueriesCache.GetOrAdd(
315+
new KeyValuePair<SqlServerStorage, int>(storage, queues.Length),
316+
static pair =>
317+
{
318+
var template = pair.Key.GetQueryFromTemplate(static schemaName =>
319+
$@"delete top (1) JQ
320+
output DELETED.Id, DELETED.JobId, DELETED.Queue
321+
from [{schemaName}].JobQueue JQ with (readpast, updlock, rowlock, forceseek)
322+
where Queue in @queues and (FetchedAt is null or FetchedAt < DATEADD(second, @timeout, GETUTCDATE()))");
323+
324+
return template.Replace(
325+
"@queues",
326+
"(" + String.Join(",", Enumerable.Range(0, pair.Value).Select(static i => "@queue" + i.ToString(CultureInfo.InvariantCulture))) + ")");
327+
});
328+
329+
var command = connection.Create(query, timeout: storage.CommandTimeout);
330+
command.AddParameter("@timeout", invisibilityTimeout, DbType.Int32);
331+
332+
for (var i = 0; i < queues.Length; i++)
333+
{
334+
command.AddParameter("@queue" + i, queues[i], DbType.String);
335+
}
336+
337+
return command;
338+
}
339+
308340
private static WaitHandle[] GetWaitArrayForQueueSignals(SqlServerStorage storage, string[] queues, CancellationToken cancellationToken)
309341
{
310342
var waitList = new List<WaitHandle>(capacity: queues.Length + 1)

0 commit comments

Comments
 (0)