Skip to content

Commit bd4d040

Browse files
authored
Merge pull request #189 from JPVenson/master
[Feature] Added method for fetching a new job in the job queue
2 parents 3dae7c5 + e0794ec commit bd4d040

File tree

2 files changed

+51
-4
lines changed

2 files changed

+51
-4
lines changed

src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs

+18-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ public class PostgreSqlJobQueue : IPersistentJobQueue
3737
private readonly PostgreSqlStorageOptions _options;
3838
private readonly PostgreSqlStorage _storage;
3939

40+
private AutoResetEvent SignalDequeue { get; }
41+
42+
public PostgreSqlJobQueue(PostgreSqlStorage storage, PostgreSqlStorageOptions options)
43+
{
44+
_options = options ?? throw new ArgumentNullException(nameof(options));
45+
_storage = storage ?? throw new ArgumentNullException(nameof(storage));
46+
SignalDequeue = new AutoResetEvent(false);
47+
}
4048
public PostgreSqlJobQueue(PostgreSqlStorage storage, PostgreSqlStorageOptions options)
4149
{
4250
_options = options ?? throw new ArgumentNullException(nameof(options));
@@ -50,8 +58,13 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
5058
if (_options.UseNativeDatabaseTransactions)
5159
return Dequeue_Transaction(queues, cancellationToken);
5260

53-
return Dequeue_UpdateCount(queues, cancellationToken);
54-
}
61+
/// <summary>
62+
/// Signal the waiting Thread to lookup a new Job
63+
/// </summary>
64+
public void FetchNextJob()
65+
{
66+
SignalDequeue.Set();
67+
}
5568

5669
public void Enqueue(IDbConnection connection, string queue, string jobId)
5770
{
@@ -149,7 +162,8 @@ LIMIT 1
149162
WaitHandle.WaitAny(new[]
150163
{
151164
cancellationToken.WaitHandle,
152-
NewItemInQueueEvent
165+
NewItemInQueueEvent,
166+
SignalDequeue
153167
},
154168
_options.QueuePollInterval);
155169

@@ -253,4 +267,4 @@ private class FetchedJob
253267
public int UpdateCount { get; set; }
254268
}
255269
}
256-
}
270+
}

tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs

+33
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
using System.Globalization;
44
using System.Linq;
55
using System.Threading;
6+
using System.Threading.Tasks;
67
using Dapper;
8+
using Hangfire.Storage;
79
using Npgsql;
810
using Xunit;
911

@@ -442,6 +444,37 @@ public void Queues_Should_Support_Long_Queue_Names()
442444
Assert.Equal(name, record.queue.ToString());
443445
});
444446
}
447+
448+
[Fact, CleanDatabase]
449+
public void Queues_Can_Dequeue_On_Signal()
450+
{
451+
UseConnection((connection, storage) =>
452+
{
453+
var queue = CreateJobQueue(storage, false);
454+
IFetchedJob job = null;
455+
//as UseConnection does not support async-await we have to work with Thread.Sleep
456+
457+
Task.Run(() =>
458+
{
459+
//dequeue the job asynchronously
460+
job = queue.Dequeue(new[] {"default"}, CreateTimingOutCancellationToken());
461+
});
462+
//all sleeps are possibly way to high but this ensures that any race condition is unlikely
463+
//to ensure that the task would run
464+
Thread.Sleep(1000);
465+
Assert.Null(job);
466+
//enqueue a job that does not trigger the existing queue to reevaluate its state
467+
queue.Enqueue(connection, "default", "1");
468+
Thread.Sleep(1000);
469+
//the job should still be unset
470+
Assert.Null(job);
471+
//trigger a reevaluation
472+
queue.FetchNextJob();
473+
//wait for the Dequeue to execute and return the next job
474+
Thread.Sleep(1000);
475+
Assert.NotNull(job);
476+
});
477+
}
445478

446479
private void Enqueue_AddsAJobToTheQueue(bool useNativeDatabaseTransactions)
447480
{

0 commit comments

Comments
 (0)