Skip to content

Commit 2c270d8

Browse files
ahydraxvytautask
authored andcommitted
Implemented distributed lock timeout (#44)
1 parent 9a9c702 commit 2c270d8

11 files changed

+480
-330
lines changed

src/Hangfire.PostgreSql/Entities/JobParameter.cs

+3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
//
2020
// Special thanks goes to him.
2121

22+
using Hangfire.PostgreSql.Annotations;
23+
2224
namespace Hangfire.PostgreSql.Entities
2325
{
26+
[UsedImplicitly]
2427
internal class JobParameter
2528
{
2629
public int JobId { get; set; }

src/Hangfire.PostgreSql/Entities/Server.cs

+2
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
// Special thanks goes to him.
2121

2222
using System;
23+
using Hangfire.PostgreSql.Annotations;
2324

2425
namespace Hangfire.PostgreSql.Entities
2526
{
27+
[UsedImplicitly]
2628
internal class Server
2729
{
2830
public string Id { get; set; }

src/Hangfire.PostgreSql/Entities/SqlHash.cs

+2
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
// Special thanks goes to him.
2121

2222
using System;
23+
using Hangfire.PostgreSql.Annotations;
2324

2425
namespace Hangfire.PostgreSql.Entities
2526
{
27+
[UsedImplicitly]
2628
internal class SqlHash
2729
{
2830
public int Id { get; set; }

src/Hangfire.PostgreSql/Entities/SqlJob.cs

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
// Special thanks goes to him.
2121

2222
using System;
23+
using Hangfire.PostgreSql.Annotations;
2324

2425
namespace Hangfire.PostgreSql.Entities
2526
{
27+
[UsedImplicitly]
2628
internal class SqlJob
2729
{
2830
public int Id { get; set; }
@@ -37,4 +39,4 @@ internal class SqlJob
3739
public string StateReason { get; set; }
3840
public string StateData { get; set; }
3941
}
40-
}
42+
}

src/Hangfire.PostgreSql/Entities/SqlState.cs

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
// Special thanks goes to him.
2121

2222
using System;
23+
using Hangfire.PostgreSql.Annotations;
2324

2425
namespace Hangfire.PostgreSql.Entities
2526
{
27+
[UsedImplicitly]
2628
internal class SqlState
2729
{
2830
public int JobId { get; set; }
@@ -31,4 +33,4 @@ internal class SqlState
3133
public DateTime CreatedAt { get; set; }
3234
public string Data { get; set; }
3335
}
34-
}
36+
}

src/Hangfire.PostgreSql/Hangfire.PostgreSql.csproj

+4-1
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@
120120
<ItemGroup>
121121
<EmbeddedResource Include="Install.v6.sql" />
122122
</ItemGroup>
123+
<ItemGroup>
124+
<EmbeddedResource Include="Install.v7.sql" />
125+
</ItemGroup>
123126
<!-- <Import Project="..\Common\Hangfire.targets" /> -->
124127
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
125128
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
@@ -130,4 +133,4 @@
130133
<Target Name="AfterBuild">
131134
</Target>
132135
-->
133-
</Project>
136+
</Project>
+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
SET search_path = 'hangfire';
2+
--
3+
-- Table structure for table `Schema`
4+
--
5+
6+
DO
7+
$$
8+
BEGIN
9+
IF EXISTS (SELECT 1 FROM "schema" WHERE "version"::integer >= 7) THEN
10+
RAISE EXCEPTION 'version-already-applied';
11+
END IF;
12+
END
13+
$$;
14+
15+
ALTER TABLE "lock" ADD COLUMN acquired timestamp without time zone;

src/Hangfire.PostgreSql/PostgreSqlDistributedLock.cs

+70-49
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,19 @@
2727

2828
namespace Hangfire.PostgreSql
2929
{
30-
internal class PostgreSqlDistributedLock : IDisposable
30+
internal sealed class PostgreSqlDistributedLock : IDisposable
3131
{
32-
private readonly IDbConnection _connection;
3332
private readonly string _resource;
33+
private readonly IDbConnection _connection;
3434
private readonly PostgreSqlStorageOptions _options;
3535
private bool _completed;
3636

3737
public PostgreSqlDistributedLock(string resource, TimeSpan timeout, IDbConnection connection,
3838
PostgreSqlStorageOptions options)
3939
{
40-
if (String.IsNullOrEmpty(resource)) throw new ArgumentNullException("resource");
41-
if (connection == null) throw new ArgumentNullException("connection");
42-
if (options == null) throw new ArgumentNullException("options");
40+
if (string.IsNullOrEmpty(resource)) throw new ArgumentNullException(nameof(resource));
41+
if (connection == null) throw new ArgumentNullException(nameof(connection));
42+
if (options == null) throw new ArgumentNullException(nameof(options));
4343

4444
_resource = resource;
4545
_connection = connection;
@@ -51,89 +51,117 @@ public PostgreSqlDistributedLock(string resource, TimeSpan timeout, IDbConnectio
5151
PostgreSqlDistributedLock_Init_UpdateCount(resource, timeout, connection, options);
5252
}
5353

54-
public void PostgreSqlDistributedLock_Init_Transaction(string resource, TimeSpan timeout, IDbConnection connection, PostgreSqlStorageOptions options)
54+
private static void PostgreSqlDistributedLock_Init_Transaction(string resource, TimeSpan timeout,
55+
IDbConnection connection, PostgreSqlStorageOptions options)
5556
{
56-
57-
Stopwatch lockAcquiringTime = new Stopwatch();
58-
lockAcquiringTime.Start();
57+
var lockAcquiringTime = Stopwatch.StartNew();
5958

6059
bool tryAcquireLock = true;
6160

6261
while (tryAcquireLock)
6362
{
63+
TryRemoveDeadlock(resource, connection, options);
64+
6465
try
6566
{
6667
int rowsAffected = -1;
67-
using (var trx = _connection.BeginTransaction(IsolationLevel.RepeatableRead))
68+
using (var trx = connection.BeginTransaction(IsolationLevel.RepeatableRead))
6869
{
69-
rowsAffected = _connection.Execute(@"
70-
INSERT INTO """ + _options.SchemaName + @""".""lock""(""resource"")
71-
SELECT @resource
70+
rowsAffected = connection.Execute($@"
71+
INSERT INTO ""{options.SchemaName}"".""lock""(""resource"", ""acquired"")
72+
SELECT @resource, @acquired
7273
WHERE NOT EXISTS (
73-
SELECT 1 FROM """ + _options.SchemaName + @""".""lock""
74+
SELECT 1 FROM ""{options.SchemaName}"".""lock""
7475
WHERE ""resource"" = @resource
7576
);
76-
", new
77-
{
78-
resource = resource
79-
}, trx);
77+
",
78+
new
79+
{
80+
resource = resource,
81+
acquired = DateTime.UtcNow
82+
}, trx);
8083
trx.Commit();
8184
}
8285
if (rowsAffected > 0) return;
8386
}
84-
catch (Exception)
87+
catch
8588
{
8689
}
8790

8891
if (lockAcquiringTime.ElapsedMilliseconds > timeout.TotalMilliseconds)
92+
{
8993
tryAcquireLock = false;
94+
}
9095
else
9196
{
92-
int sleepDuration = (int) (timeout.TotalMilliseconds - lockAcquiringTime.ElapsedMilliseconds);
97+
int sleepDuration = (int)(timeout.TotalMilliseconds - lockAcquiringTime.ElapsedMilliseconds);
9398
if (sleepDuration > 1000) sleepDuration = 1000;
9499
if (sleepDuration > 0)
100+
{
95101
Thread.Sleep(sleepDuration);
102+
}
96103
else
104+
{
97105
tryAcquireLock = false;
106+
}
98107
}
99108
}
100109

101110
throw new PostgreSqlDistributedLockException(
102-
String.Format(
103-
"Could not place a lock on the resource '{0}': {1}.",
104-
_resource,
105-
"Lock timeout"));
111+
$"Could not place a lock on the resource \'{resource}\': Lock timeout.");
106112
}
107113

108-
public void PostgreSqlDistributedLock_Init_UpdateCount(string resource, TimeSpan timeout, IDbConnection connection, PostgreSqlStorageOptions options)
114+
private static void TryRemoveDeadlock(string resource, IDbConnection connection, PostgreSqlStorageOptions options)
109115
{
116+
try
117+
{
118+
using (var transaction = connection.BeginTransaction(IsolationLevel.RepeatableRead))
119+
{
120+
int affected = -1;
121+
122+
affected = connection.Execute($@"DELETE FROM ""{options.SchemaName}"".""lock"" WHERE ""resource"" = @resource AND ""acquired"" < @timeout",
123+
new
124+
{
125+
resource = resource,
126+
timeout = DateTime.UtcNow - options.DistributedLockTimeout
127+
});
110128

111-
Stopwatch lockAcquiringTime = new Stopwatch();
112-
lockAcquiringTime.Start();
129+
transaction.Commit();
130+
}
131+
}
132+
catch
133+
{
134+
}
135+
}
136+
137+
private static void PostgreSqlDistributedLock_Init_UpdateCount(string resource, TimeSpan timeout, IDbConnection connection, PostgreSqlStorageOptions options)
138+
{
139+
var lockAcquiringTime = Stopwatch.StartNew();
113140

114141
bool tryAcquireLock = true;
115142

116143
while (tryAcquireLock)
117144
{
118145
try
119146
{
120-
_connection.Execute(@"
121-
INSERT INTO """ + _options.SchemaName + @""".""lock""(""resource"", ""updatecount"")
122-
SELECT @resource, 0
147+
connection.Execute($@"
148+
INSERT INTO ""{options.SchemaName}"".""lock""(""resource"", ""updatecount"", ""acquired"")
149+
SELECT @resource, 0, @acquired
123150
WHERE NOT EXISTS (
124-
SELECT 1 FROM """ + _options.SchemaName + @""".""lock""
151+
SELECT 1 FROM ""{options.SchemaName}"".""lock""
125152
WHERE ""resource"" = @resource
126153
);
127154
", new
128-
{
129-
resource = resource
130-
});
155+
{
156+
resource = resource,
157+
acquired = DateTime.UtcNow
158+
});
131159
}
132160
catch (Exception)
133161
{
134162
}
135163

136-
int rowsAffected = _connection.Execute(@"UPDATE """ + _options.SchemaName + @""".""lock"" SET ""updatecount"" = 1 WHERE ""updatecount"" = 0");
164+
int rowsAffected = connection.Execute($@"UPDATE ""{options.SchemaName}"".""lock"" SET ""updatecount"" = 1 WHERE ""updatecount"" = 0");
137165

138166
if (rowsAffected > 0) return;
139167

@@ -151,25 +179,20 @@ SELECT 1 FROM """ + _options.SchemaName + @""".""lock""
151179
}
152180

153181
throw new PostgreSqlDistributedLockException(
154-
String.Format(
155-
"Could not place a lock on the resource '{0}': {1}.",
156-
_resource,
157-
"Lock timeout"));
182+
$"Could not place a lock on the resource '{resource}': Lock timeout.");
158183
}
159184

160-
161-
162-
163185
public void Dispose()
164186
{
165187
if (_completed) return;
166188

167189
_completed = true;
168190

169-
int rowsAffected = _connection.Execute(@"
170-
DELETE FROM """ + _options.SchemaName + @""".""lock""
191+
int rowsAffected = _connection.Execute($@"
192+
DELETE FROM ""{_options.SchemaName}"".""lock""
171193
WHERE ""resource"" = @resource;
172-
", new
194+
",
195+
new
173196
{
174197
resource = _resource
175198
});
@@ -178,10 +201,8 @@ DELETE FROM """ + _options.SchemaName + @""".""lock""
178201
if (rowsAffected <= 0)
179202
{
180203
throw new PostgreSqlDistributedLockException(
181-
String.Format(
182-
"Could not release a lock on the resource '{0}'. Lock does not exists.",
183-
_resource));
204+
$"Could not release a lock on the resource '{_resource}'. Lock does not exists.");
184205
}
185206
}
186207
}
187-
}
208+
}

0 commit comments

Comments
 (0)