Skip to content

Commit 666b13e

Browse files
authored
Merge pull request #320 from Tinyakov/feature/jobs_fetching_signals_refact
Fixing Enqueued Job Trigger for Multiple Queues
2 parents b55259c + bcb0ef7 commit 666b13e

File tree

4 files changed

+55
-20
lines changed

4 files changed

+55
-20
lines changed

src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs

+8-10
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class PostgreSqlJobQueue : IPersistentJobQueue
3737
{
3838
private const string JobNotificationChannel = "new_job";
3939

40-
internal static readonly AutoResetEvent _newItemInQueueEvent = new(true);
40+
internal static readonly AutoResetEventRegistry _queueEventRegistry = new();
4141
private readonly PostgreSqlStorage _storage;
4242

4343
public PostgreSqlJobQueue(PostgreSqlStorage storage)
@@ -147,6 +147,12 @@ LIMIT 1
147147
RETURNING ""id"" AS ""Id"", ""jobid"" AS ""JobId"", ""queue"" AS ""Queue"", ""fetchedat"" AS ""FetchedAt"";
148148
";
149149

150+
WaitHandle[] nextFetchIterationWaitHandles = new[] {
151+
cancellationToken.WaitHandle,
152+
SignalDequeue,
153+
JobQueueNotification,
154+
}.Concat(_queueEventRegistry.GetWaitHandles(queues)).ToArray();
155+
150156
do
151157
{
152158
cancellationToken.ThrowIfCancellationRequested();
@@ -181,15 +187,7 @@ LIMIT 1
181187

182188
if (fetchedJob == null)
183189
{
184-
WaitHandle.WaitAny(new[] {
185-
cancellationToken.WaitHandle,
186-
_newItemInQueueEvent,
187-
SignalDequeue,
188-
JobQueueNotification,
189-
},
190-
_storage.Options.QueuePollInterval);
191-
192-
cancellationToken.ThrowIfCancellationRequested();
190+
WaitHandle.WaitAny(nextFetchIterationWaitHandles, _storage.Options.QueuePollInterval);
193191
}
194192
}
195193
while (fetchedJob == null);

src/Hangfire.PostgreSql/PostgreSqlStorage.cs

-10
Original file line numberDiff line numberDiff line change
@@ -320,22 +320,12 @@ internal T UseTransaction<T>(DbConnection dedicatedConnection, Func<DbConnection
320320

321321
T result = func(connection, null);
322322

323-
// TransactionCompleted event is required here, because if this TransactionScope is enlisted within an ambient TransactionScope, the ambient TransactionScope controls when the TransactionScope completes.
324-
Transaction.Current.TransactionCompleted += Current_TransactionCompleted;
325323
transaction.Complete();
326324

327325
return result;
328326
});
329327
}
330328

331-
private static void Current_TransactionCompleted(object sender, TransactionEventArgs e)
332-
{
333-
if (e.Transaction.TransactionInformation.Status == TransactionStatus.Committed)
334-
{
335-
PostgreSqlJobQueue._newItemInQueueEvent.Set();
336-
}
337-
}
338-
339329
internal TransactionScope CreateTransactionScope(IsolationLevel? isolationLevel, TimeSpan? timeout = null)
340330
{
341331
return TransactionHelpers.CreateTransactionScope(isolationLevel, Options.EnableTransactionScopeEnlistment, timeout);

src/Hangfire.PostgreSql/PostgreSqlWriteOnlyTransaction.cs

+7
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class PostgreSqlWriteOnlyTransaction : JobStorageTransaction
3737
{
3838
private readonly Queue<Action<IDbConnection>> _commandQueue = new();
3939
private readonly Func<DbConnection> _dedicatedConnectionFunc;
40+
private readonly List<string> _queuesWithAddedJobs = new();
4041

4142
private readonly PostgreSqlStorage _storage;
4243

@@ -56,6 +57,10 @@ public override void Commit()
5657
command(connection);
5758
}
5859
}, CreateTransactionScope);
60+
61+
// Triggers signals for all queues to which jobs have been added in this transaction
62+
_queuesWithAddedJobs.ForEach(PostgreSqlJobQueue._queueEventRegistry.Set);
63+
_queuesWithAddedJobs.Clear();
5964
}
6065

6166
private TransactionScope CreateTransactionScope()
@@ -134,6 +139,8 @@ public override void AddToQueue(string queue, string jobId)
134139
IPersistentJobQueue persistentQueue = provider.GetJobQueue();
135140

136141
QueueCommand(con => persistentQueue.Enqueue(con, queue, jobId));
142+
143+
_queuesWithAddedJobs.Add(queue);
137144
}
138145

139146
public override void IncrementCounter(string key)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System.Collections.Concurrent;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
5+
namespace Hangfire.PostgreSql.Utils
6+
{
7+
/// <summary>
8+
/// Represents a registry for managing AutoResetEvent instances using event keys.
9+
/// </summary>
10+
public class AutoResetEventRegistry
11+
{
12+
private readonly ConcurrentDictionary<string, AutoResetEvent> _events = new();
13+
14+
/// <summary>
15+
/// Retrieves the wait handles associated with the specified event keys.
16+
/// </summary>
17+
/// <param name="eventKeys">The event keys.</param>
18+
/// <returns>An enumerable of wait handles.</returns>
19+
public IEnumerable<WaitHandle> GetWaitHandles(IEnumerable<string> eventKeys)
20+
{
21+
foreach (string eventKey in eventKeys)
22+
{
23+
AutoResetEvent newHandle = _events.GetOrAdd(eventKey, _ => new AutoResetEvent(false));
24+
yield return newHandle;
25+
}
26+
}
27+
28+
/// <summary>
29+
/// Sets the specified event.
30+
/// </summary>
31+
/// <param name="eventKey">The event key.</param>
32+
public void Set(string eventKey)
33+
{
34+
if (_events.TryGetValue(eventKey, out AutoResetEvent handle))
35+
{
36+
handle.Set();
37+
}
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)