Skip to content

Commit d572b57

Browse files
authored
Merge pull request #275 from frankhommers/features/237-fix-transactions
Fixed the transaction-related issues
2 parents 2b132a7 + f07e3d6 commit d572b57

12 files changed

+120
-87
lines changed

src/Hangfire.PostgreSql/Hangfire.PostgreSql.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@
3030

3131
<ItemGroup>
3232
<PackageReference Include="Dapper" Version="2.0.123" />
33-
<PackageReference Include="GitVersion.MsBuild" Version="5.8.1">
33+
<PackageReference Include="GitVersion.MsBuild" Version="5.11.1">
3434
<PrivateAssets>all</PrivateAssets>
3535
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
3636
</PackageReference>
37-
<PackageReference Include="Hangfire.Core" Version="1.7.27" />
37+
<PackageReference Include="Hangfire.Core" Version="1.7.28" />
3838
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
3939
</ItemGroup>
4040

src/Hangfire.PostgreSql/PostgreSqlDistributedLock.cs

+37-15
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@
2323
using System.Data;
2424
using System.Diagnostics;
2525
using System.Threading;
26+
using System.Transactions;
2627
using Dapper;
28+
using Hangfire.Annotations;
2729
using Hangfire.Logging;
2830
using Npgsql;
31+
using IsolationLevel = System.Data.IsolationLevel;
2932

3033
namespace Hangfire.PostgreSql
3134
{
@@ -98,7 +101,7 @@ internal static void Release(IDbConnection connection, string resource, PostgreS
98101

99102
if (rowsAffected <= 0)
100103
{
101-
throw new PostgreSqlDistributedLockException($"Could not release a lock on the resource '{resource}'. Lock does not exists.");
104+
throw new PostgreSqlDistributedLockException($"Could not release a lock on the resource '{resource}'. Lock does not exist.");
102105
}
103106
}
104107

@@ -109,22 +112,24 @@ public static void Lock(string resource, TimeSpan timeout, IDbConnection connect
109112
Stopwatch lockAcquiringTime = Stopwatch.StartNew();
110113

111114
bool tryAcquireLock = true;
115+
Exception lastException = null;
112116

113117
while (tryAcquireLock)
114118
{
119+
lastException = null;
115120
if (connection.State != ConnectionState.Open)
116121
{
117122
connection.Open();
118123
}
119124

120125
TryRemoveLock(resource, connection, options);
121126

127+
IDbTransaction trx = null;
122128
try
123129
{
124-
int rowsAffected;
125-
using (IDbTransaction trx = connection.BeginTransaction(IsolationLevel.RepeatableRead))
126-
{
127-
rowsAffected = connection.Execute($@"
130+
trx = BeginTransactionIfNotPresent(connection);
131+
132+
int rowsAffected = connection.Execute($@"
128133
INSERT INTO ""{options.SchemaName}"".""lock""(""resource"", ""acquired"")
129134
SELECT @Resource, @Acquired
130135
WHERE NOT EXISTS (
@@ -133,12 +138,11 @@ SELECT 1 FROM ""{options.SchemaName}"".""lock""
133138
)
134139
ON CONFLICT DO NOTHING;
135140
",
136-
new {
137-
Resource = resource,
138-
Acquired = DateTime.UtcNow,
139-
}, trx);
140-
trx.Commit();
141-
}
141+
new {
142+
Resource = resource,
143+
Acquired = DateTime.UtcNow,
144+
}, trx);
145+
trx?.Commit();
142146

143147
if (rowsAffected > 0)
144148
{
@@ -147,8 +151,13 @@ SELECT 1 FROM ""{options.SchemaName}"".""lock""
147151
}
148152
catch (Exception ex)
149153
{
154+
lastException = ex;
150155
Log(resource, "Failed to lock with transaction", ex);
151156
}
157+
finally
158+
{
159+
trx?.Dispose();
160+
}
152161

153162
if (lockAcquiringTime.ElapsedMilliseconds > timeout.TotalMilliseconds)
154163
{
@@ -173,26 +182,39 @@ SELECT 1 FROM ""{options.SchemaName}"".""lock""
173182
}
174183
}
175184

176-
throw new PostgreSqlDistributedLockException($@"Could not place a lock on the resource '{resource}': Lock timeout.");
185+
throw new PostgreSqlDistributedLockException($@"Could not place a lock on the resource '{resource}': Lock timeout.", lastException);
177186
}
178187

179188
private static void TryRemoveLock(string resource, IDbConnection connection, PostgreSqlStorageOptions options)
180189
{
190+
IDbTransaction trx = null;
181191
try
182192
{
183-
using IDbTransaction transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead);
193+
trx = BeginTransactionIfNotPresent(connection);
184194
connection.Execute($@"DELETE FROM ""{options.SchemaName}"".""lock"" WHERE ""resource"" = @Resource AND ""acquired"" < @Timeout",
185195
new {
186196
Resource = resource,
187197
Timeout = DateTime.UtcNow - options.DistributedLockTimeout,
188-
}, transaction: transaction);
198+
}, trx);
189199

190-
transaction.Commit();
200+
trx?.Commit();
191201
}
192202
catch (Exception ex)
193203
{
194204
Log(resource, "Failed to remove lock", ex);
195205
}
206+
finally
207+
{
208+
trx?.Dispose();
209+
}
210+
}
211+
212+
[CanBeNull]
213+
private static IDbTransaction BeginTransactionIfNotPresent(IDbConnection connection)
214+
{
215+
// If transaction scope was created outside of hangfire, the newly-opened connection is automatically enlisted into the transaction.
216+
// Starting a new transaction throws "A transaction is already in progress; nested/concurrent transactions aren't supported." in that case.
217+
return Transaction.Current == null ? connection.BeginTransaction(IsolationLevel.ReadCommitted) : null;
196218
}
197219
}
198220

src/Hangfire.PostgreSql/PostgreSqlDistributedLockException.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// This file is part of Hangfire.PostgreSql.
1+
// This file is part of Hangfire.PostgreSql.
22
// Copyright © 2014 Frank Hommers <http://hmm.rs/Hangfire.PostgreSql>.
33
//
44
// Hangfire.PostgreSql is free software: you can redistribute it and/or modify
@@ -29,5 +29,9 @@ public class PostgreSqlDistributedLockException : Exception
2929
public PostgreSqlDistributedLockException(string message) : base(message)
3030
{
3131
}
32+
33+
public PostgreSqlDistributedLockException(string message, Exception innerException) : base(message, innerException)
34+
{
35+
}
3236
}
3337
}

src/Hangfire.PostgreSql/PostgreSqlStorage.cs

+12-42
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
using System.Transactions;
2828
using Hangfire.Annotations;
2929
using Hangfire.Logging;
30+
using Hangfire.PostgreSql.Utils;
3031
using Hangfire.Server;
3132
using Hangfire.Storage;
3233
using Npgsql;
@@ -119,13 +120,9 @@ public PostgreSqlStorage(NpgsqlConnection existingConnection, PostgreSqlStorageO
119120
}
120121

121122
NpgsqlConnectionStringBuilder connectionStringBuilder = new(existingConnection.ConnectionString);
122-
123-
if (!options.EnableTransactionScopeEnlistment)
123+
if (!options.EnableTransactionScopeEnlistment && connectionStringBuilder.Enlist)
124124
{
125-
if (connectionStringBuilder.Enlist)
126-
{
127-
throw new ArgumentException($"TransactionScope enlistment must be enabled by setting {nameof(PostgreSqlStorageOptions)}.{nameof(options.EnableTransactionScopeEnlistment)} to `true`.");
128-
}
125+
throw new ArgumentException($"TransactionScope enlistment must be enabled by setting {nameof(PostgreSqlStorageOptions)}.{nameof(options.EnableTransactionScopeEnlistment)} to `true`.");
129126
}
130127

131128
_existingConnection = existingConnection;
@@ -134,23 +131,8 @@ public PostgreSqlStorage(NpgsqlConnection existingConnection, PostgreSqlStorageO
134131
InitializeQueueProviders();
135132
}
136133

137-
public PostgreSqlStorage(NpgsqlConnection existingConnection)
134+
public PostgreSqlStorage(NpgsqlConnection existingConnection) : this(existingConnection, new PostgreSqlStorageOptions())
138135
{
139-
if (existingConnection == null)
140-
{
141-
throw new ArgumentNullException(nameof(existingConnection));
142-
}
143-
144-
NpgsqlConnectionStringBuilder connectionStringBuilder = new(existingConnection.ConnectionString);
145-
if (connectionStringBuilder.Enlist)
146-
{
147-
throw new ArgumentException($"TransactionScope enlistment must be enabled by setting {nameof(PostgreSqlStorageOptions)}.{nameof(PostgreSqlStorageOptions.EnableTransactionScopeEnlistment)} to `true`.");
148-
}
149-
150-
_existingConnection = existingConnection;
151-
Options = new PostgreSqlStorageOptions();
152-
153-
InitializeQueueProviders();
154136
}
155137

156138
public PersistentJobQueueProviderCollection QueueProviders { get; internal set; }
@@ -205,16 +187,6 @@ public override string ToString()
205187
}
206188
}
207189

208-
/// <summary>
209-
/// Timezone must be UTC for compatibility with Npgsql 6 and our usage of "timestamp without time zone" columns
210-
/// See https://github.com/frankhommers/Hangfire.PostgreSql/issues/221
211-
/// </summary>
212-
/// <param name="connectionStringBuilder">The ConnectionStringBuilder to set the Timezone property for</param>
213-
internal static void SetTimezoneToUtcForNpgsqlCompatibility(NpgsqlConnectionStringBuilder connectionStringBuilder)
214-
{
215-
connectionStringBuilder.Timezone = "UTC";
216-
}
217-
218190
internal NpgsqlConnection CreateAndOpenConnection()
219191
{
220192
NpgsqlConnection connection;
@@ -225,12 +197,13 @@ internal NpgsqlConnection CreateAndOpenConnection()
225197

226198
if (!Options.EnableTransactionScopeEnlistment)
227199
{
228-
NpgsqlConnectionStringBuilder connectionStringBuilder;
200+
NpgsqlConnectionStringBuilder connectionStringBuilder =
229201
#if !USING_NPGSQL_VERSION_5
230-
connectionStringBuilder = connection.Settings;
202+
connection.Settings;
231203
#else
232-
connectionStringBuilder = new(connection.ConnectionString);
204+
new(connection.ConnectionString);
233205
#endif
206+
234207
if (connectionStringBuilder.Enlist)
235208
{
236209
throw new ArgumentException(
@@ -284,7 +257,7 @@ internal T UseTransaction<T>(DbConnection dedicatedConnection,
284257

285258
if (!EnvironmentHelpers.IsMono())
286259
{
287-
using TransactionScope transaction = CreateTransaction(isolationLevel);
260+
using TransactionScope transaction = CreateTransactionScope(isolationLevel);
288261
T result = UseConnection(dedicatedConnection, connection => {
289262
connection.EnlistTransaction(Transaction.Current);
290263
return func(connection, null);
@@ -358,12 +331,9 @@ private static void Current_TransactionCompleted(object sender, TransactionEvent
358331
}
359332
}
360333

361-
private static TransactionScope CreateTransaction(IsolationLevel? isolationLevel)
334+
internal TransactionScope CreateTransactionScope(IsolationLevel? isolationLevel, TimeSpan? timeout = null)
362335
{
363-
return isolationLevel != null
364-
? new TransactionScope(TransactionScopeOption.Required,
365-
new TransactionOptions { IsolationLevel = isolationLevel.Value })
366-
: new TransactionScope();
336+
return TransactionHelpers.CreateTransactionScope(isolationLevel, Options.EnableTransactionScopeEnlistment, timeout);
367337
}
368338

369339
private static System.Data.IsolationLevel? ConvertIsolationLevel(IsolationLevel? isolationLevel)
@@ -448,7 +418,7 @@ private NpgsqlConnectionStringBuilder SetupConnectionStringBuilderParameters(Npg
448418
if (!Options.EnableTransactionScopeEnlistment)
449419
{
450420
builder.Enlist = false;
451-
SetTimezoneToUtcForNpgsqlCompatibility(builder);
421+
builder.SetTimezoneToUtcForNpgsqlCompatibility();
452422
}
453423

454424
return builder;

src/Hangfire.PostgreSql/PostgreSqlStorageOptions.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public PostgreSqlStorageOptions()
4747
AllowUnsafeValues = false;
4848
UseNativeDatabaseTransactions = true;
4949
PrepareSchemaIfNecessary = true;
50+
EnableTransactionScopeEnlistment = true;
5051
DeleteExpiredBatchSize = 1000;
5152
}
5253

@@ -104,7 +105,7 @@ public TimeSpan CountersAggregateInterval
104105
_countersAggregateInterval = value;
105106
}
106107
}
107-
108+
108109
/// <summary>
109110
/// Gets or sets the number of records deleted in a single batch in expiration manager
110111
/// </summary>

src/Hangfire.PostgreSql/PostgreSqlWriteOnlyTransaction.cs

+1-19
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
using Hangfire.Common;
3131
using Hangfire.States;
3232
using Hangfire.Storage;
33-
using IsolationLevel = System.Transactions.IsolationLevel;
3433

3534
namespace Hangfire.PostgreSql
3635
{
@@ -61,24 +60,7 @@ public override void Commit()
6160

6261
private TransactionScope CreateTransactionScope()
6362
{
64-
IsolationLevel isolationLevel = IsolationLevel.ReadCommitted;
65-
TransactionScopeOption scopeOption = TransactionScopeOption.RequiresNew;
66-
if (_storage.Options.EnableTransactionScopeEnlistment)
67-
{
68-
Transaction currentTransaction = Transaction.Current;
69-
if (currentTransaction != null)
70-
{
71-
isolationLevel = currentTransaction.IsolationLevel;
72-
scopeOption = TransactionScopeOption.Required;
73-
}
74-
}
75-
76-
TransactionOptions transactionOptions = new() {
77-
IsolationLevel = isolationLevel,
78-
Timeout = TransactionManager.MaximumTimeout,
79-
};
80-
81-
return new TransactionScope(scopeOption, transactionOptions);
63+
return _storage.CreateTransactionScope(null, TransactionManager.MaximumTimeout);
8264
}
8365

8466
public override void ExpireJob(string jobId, TimeSpan expireIn)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
using Npgsql;
2+
3+
namespace Hangfire.PostgreSql.Utils
4+
{
5+
internal static class ConnectionStringBuilderExtensions
6+
{
7+
/// <summary>
8+
/// Timezone must be UTC for compatibility with Npgsql 6 and our usage of "timestamp without time zone" columns
9+
/// See https://github.com/frankhommers/Hangfire.PostgreSql/issues/221
10+
/// </summary>
11+
/// <param name="connectionStringBuilder">The ConnectionStringBuilder to set the Timezone property for</param>
12+
internal static void SetTimezoneToUtcForNpgsqlCompatibility(this NpgsqlConnectionStringBuilder connectionStringBuilder)
13+
{
14+
if (connectionStringBuilder == null)
15+
{
16+
return;
17+
}
18+
19+
connectionStringBuilder.Timezone = "UTC";
20+
}
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
using System;
2+
using System.Transactions;
3+
4+
namespace Hangfire.PostgreSql.Utils
5+
{
6+
public static class TransactionHelpers
7+
{
8+
internal static TransactionScope CreateTransactionScope(IsolationLevel? isolationLevel = IsolationLevel.ReadCommitted, bool enlist = true, TimeSpan? timeout = null)
9+
{
10+
TransactionScopeOption scopeOption = TransactionScopeOption.RequiresNew;
11+
if (enlist)
12+
{
13+
Transaction currentTransaction = Transaction.Current;
14+
if (currentTransaction != null)
15+
{
16+
isolationLevel = currentTransaction.IsolationLevel;
17+
scopeOption = TransactionScopeOption.Required;
18+
}
19+
}
20+
21+
return new TransactionScope(
22+
scopeOption,
23+
new TransactionOptions {
24+
IsolationLevel = isolationLevel.GetValueOrDefault(IsolationLevel.ReadCommitted),
25+
Timeout = timeout.GetValueOrDefault(TransactionManager.DefaultTimeout),
26+
});
27+
}
28+
}
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
using Xunit;
2+
3+
// Running tests in parallel actually takes more time when we are cleaning the database for majority of tests. It's quicker to just let them run sequentially.
4+
[assembly: CollectionBehavior(CollectionBehavior.CollectionPerAssembly, DisableTestParallelization = true)]

0 commit comments

Comments
 (0)