Skip to content

Commit 11eb27a

Browse files
authored
Enqueueing a job does not trigger a prepared transaction anymore when using transaction scope enlistment (#160)
* Added test for prepared transactions * Enqueueing a job does not trigger a prepared transaction anymore when using transaction scope enlistment Co-authored-by: Johannes Hartmann <[email protected]>
1 parent 5f1f7cb commit 11eb27a

File tree

5 files changed

+41
-10
lines changed

5 files changed

+41
-10
lines changed

src/Hangfire.PostgreSql/IPersistentJobQueue.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//
2020
// Special thanks goes to him.
2121

22+
using System.Data;
2223
using System.Threading;
2324
using Hangfire.Storage;
2425

@@ -27,6 +28,6 @@ namespace Hangfire.PostgreSql
2728
public interface IPersistentJobQueue
2829
{
2930
IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken);
30-
void Enqueue(string queue, string jobId);
31+
void Enqueue(IDbConnection connection, string queue, string jobId);
3132
}
3233
}

src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -229,14 +229,14 @@ ORDER BY
229229
markJobAsFetched.Queue);
230230
}
231231

232-
public void Enqueue(string queue, string jobId)
232+
public void Enqueue(IDbConnection connection, string queue, string jobId)
233233
{
234234
string enqueueJobSql = @"
235235
INSERT INTO """ + _options.SchemaName + @""".""jobqueue"" (""jobid"", ""queue"")
236236
VALUES (@jobId, @queue);
237237
";
238238

239-
_storage.UseConnection(connection => connection.Execute(enqueueJobSql, new {jobId = Convert.ToInt32(jobId, CultureInfo.InvariantCulture), queue = queue}));
239+
connection.Execute(enqueueJobSql, new {jobId = Convert.ToInt32(jobId, CultureInfo.InvariantCulture), queue = queue});
240240
}
241241

242242
[UsedImplicitly(ImplicitUseTargetFlags.WithMembers)]

src/Hangfire.PostgreSql/PostgreSqlWriteOnlyTransaction.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public override void AddToQueue(string queue, string jobId)
169169
var provider = _queueProviders.GetProvider(queue);
170170
var persistentQueue = provider.GetJobQueue();
171171

172-
QueueCommand((con) => persistentQueue.Enqueue(queue, jobId));
172+
QueueCommand((con) => persistentQueue.Enqueue(con, queue, jobId));
173173
}
174174

175175
public override void IncrementCounter(string key)

tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs

+5-5
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ public void Dequeue_ShouldFetchAJob_FromQueueWithHigherPriority()
5353
var queue = CreateJobQueue(storage, false);
5454
var token = CreateTimingOutCancellationToken();
5555

56-
queue.Enqueue("1", "1");
57-
queue.Enqueue("2", "2");
58-
queue.Enqueue("3", "3");
56+
queue.Enqueue(connection, "1", "1");
57+
queue.Enqueue(connection, "2", "2");
58+
queue.Enqueue(connection, "3", "3");
5959

6060
Assert.Equal("1", queue.Dequeue(new[] { "1", "2", "3" }, token).JobId);
6161
Assert.Equal("2", queue.Dequeue(new[] { "2", "3", "1" }, token).JobId);
@@ -436,7 +436,7 @@ public void Queues_Should_Support_Long_Queue_Names()
436436

437437
Assert.True(name.Length > 21);
438438

439-
queue.Enqueue(name, "1");
439+
queue.Enqueue(connection, name, "1");
440440

441441
var record = connection.Query(@"select * from """ + GetSchemaName() + @""".""jobqueue""").Single();
442442
Assert.Equal(name, record.queue.ToString());
@@ -449,7 +449,7 @@ private void Enqueue_AddsAJobToTheQueue(bool useNativeDatabaseTransactions)
449449
{
450450
var queue = CreateJobQueue(storage, useNativeDatabaseTransactions);
451451

452-
queue.Enqueue("default", "1");
452+
queue.Enqueue(connection, "default", "1");
453453

454454
var record = connection.Query(@"select * from """ + GetSchemaName() + @""".""jobqueue""").Single();
455455
Assert.Equal("1", record.jobid.ToString());

tests/Hangfire.PostgreSql.Tests/PostgreSqlWriteOnlyTransactionFacts.cs

+31-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Linq;
66
using System.Transactions;
77
using Dapper;
8+
using Hangfire.Common;
89
using Hangfire.States;
910
using Moq;
1011
using Npgsql;
@@ -265,7 +266,7 @@ public void AddToQueue_CallsEnqueue_OnTargetPersistentQueue()
265266

266267
Commit(sql, x => x.AddToQueue("default", "1"));
267268

268-
correctJobQueue.Verify(x => x.Enqueue("default", "1"));
269+
correctJobQueue.Verify(x => x.Enqueue(sql, "default", "1"));
269270
});
270271
}
271272

@@ -1051,6 +1052,35 @@ public void PersistList_ClearsExpirationTime_OnAGivenHash()
10511052
});
10521053
}
10531054

1055+
[Fact, CleanDatabase]
1056+
public void AddToQueue_AddsAJobToTheQueue_UsingStorageConnection_WithTransactionScopeEnlistment()
1057+
{
1058+
string jobId;
1059+
var storage = new PostgreSqlStorage(ConnectionUtils.GetConnectionString(), new PostgreSqlStorageOptions { EnableTransactionScopeEnlistment = true });
1060+
using (var storageConnection = storage.GetConnection())
1061+
{
1062+
using (var writeTransaction = storageConnection.CreateWriteTransaction())
1063+
{
1064+
// Explicitly call multiple write commands here, as AddToQueue previously opened an own connection.
1065+
// This triggered a prepared transaction which should be avoided.
1066+
jobId = storageConnection.CreateExpiredJob(Job.FromExpression(() => Console.Write("Hi")), new Dictionary<string, string>(), DateTime.UtcNow, TimeSpan.FromMinutes(1));
1067+
1068+
writeTransaction.SetJobState(jobId, new ScheduledState(DateTime.UtcNow));
1069+
writeTransaction.AddToQueue("default", jobId);
1070+
writeTransaction.PersistJob(jobId);
1071+
writeTransaction.Commit();
1072+
}
1073+
}
1074+
1075+
UseConnection(connection =>
1076+
{
1077+
var record = connection.Query(@"select * from """ + GetSchemaName() + @""".""jobqueue""").Single();
1078+
Assert.Equal(jobId, record.jobid.ToString());
1079+
Assert.Equal("default", record.queue);
1080+
Assert.Null(record.FetchedAt);
1081+
});
1082+
}
1083+
10541084
private void UseConnection(Action<NpgsqlConnection> action)
10551085
{
10561086
using (var connection = ConnectionUtils.CreateConnection())

0 commit comments

Comments
 (0)