Skip to content

Commit 93cbf93

Browse files
committed
Fixed PostgreSqlJobQueue.Dequeue queue prioritization
1 parent 803ce7f commit 93cbf93

File tree

2 files changed

+35
-7
lines changed

2 files changed

+35
-7
lines changed

src/Hangfire.PostgreSql.NetCore/PostgreSqlJobQueue.cs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,15 @@ public IFetchedJob Dequeue_Transaction(string[] queues, CancellationToken cancel
7171
SET ""fetchedat"" = NOW() AT TIME ZONE 'UTC'
7272
WHERE ""id"" IN (
7373
SELECT ""id""
74-
FROM """ + _options.SchemaName + @""".""jobqueue""
74+
FROM """ + _options.SchemaName + $@""".""jobqueue""
7575
WHERE ""queue"" = ANY (@queues)
76-
AND ""fetchedat"" {0}
77-
ORDER BY ""fetchedat"", ""jobid""
76+
AND ""fetchedat"" {{0}}
77+
ORDER BY
78+
CASE ""queue""
79+
{string.Join("\n", queues.Select((q, i) => $"WHEN '{q}' THEN {i}"))}
80+
ELSE {queues.Length}
81+
END,
82+
""fetchedat"", ""jobid""
7883
LIMIT 1
7984
)
8085
RETURNING ""id"" AS ""Id"", ""jobid"" AS ""JobId"", ""queue"" AS ""Queue"", ""fetchedat"" AS ""FetchedAt"";
@@ -157,10 +162,15 @@ public IFetchedJob Dequeue_UpdateCount(string[] queues, CancellationToken cancel
157162

158163
string jobToFetchSqlTemplate = @"
159164
SELECT ""id"" AS ""Id"", ""jobid"" AS ""JobId"", ""queue"" AS ""Queue"", ""fetchedat"" AS ""FetchedAt"", ""updatecount"" AS ""UpdateCount""
160-
FROM """ + _options.SchemaName + @""".""jobqueue""
165+
FROM """ + _options.SchemaName + $@""".""jobqueue""
161166
WHERE ""queue"" = ANY (@queues)
162-
AND ""fetchedat"" {0}
163-
ORDER BY ""fetchedat"", ""jobid""
167+
AND ""fetchedat"" {{0}}
168+
ORDER BY
169+
CASE ""queue""
170+
{string.Join("\n", queues.Select((q, i) => $"WHEN '{q}' THEN {i}"))}
171+
ELSE {queues.Length}
172+
END,
173+
""fetchedat"", ""jobid""
164174
LIMIT 1;
165175
";
166176

tests/Hangfire.PostgreSql.NetCore.Tests/PostgreSqlJobQueueFacts.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,25 @@ public void Dequeue_ShouldThrowAnException_WhenQueuesCollectionIsNull()
4646
});
4747
}
4848

49-
[Fact, CleanDatabase]
49+
[Fact, CleanDatabase]
50+
public void Dequeue_ShouldFetchAJob_FromQueueWithHigherPriority()
51+
{
52+
UseConnection(connection =>
53+
{
54+
var queue = CreateJobQueue(connection, false);
55+
var token = CreateTimingOutCancellationToken();
56+
57+
queue.Enqueue("1", "1");
58+
queue.Enqueue("2", "2");
59+
queue.Enqueue("3", "3");
60+
61+
Assert.Equal("1", queue.Dequeue(new[] { "1", "2", "3" }, token).JobId);
62+
Assert.Equal("2", queue.Dequeue(new[] { "2", "3", "1" }, token).JobId);
63+
Assert.Equal("3", queue.Dequeue(new[] { "3", "1", "2" }, token).JobId);
64+
});
65+
}
66+
67+
[Fact, CleanDatabase]
5068
private void Dequeue_ShouldThrowAnException_WhenQueuesCollectionIsEmpty_WithUseNativeDatabaseTransactions()
5169
{
5270
Dequeue_ShouldThrowAnException_WhenQueuesCollectionIsEmpty(true);

0 commit comments

Comments
 (0)