Skip to content

Commit 4618eb5

Browse files
kmakris23jamesmh
andauthored
feat: configurable queue consummation delay (#411)
* feat: configurable queue consummation delay * Update README.md * Add comments and encapsulate method --------- Co-authored-by: James Hickey <jamesmh@users.noreply.github.com>
1 parent 0e17a39 commit 4618eb5

4 files changed

Lines changed: 67 additions & 9 deletions

File tree

DocsV2/docs/Queuing/README.md

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ meta:
1010

1111
[[toc]]
1212

13-
Coravel gives you a zero-configuration queue that runs in-memory.
13+
Coravel gives you a zero-configuration queue that runs in-memory.
1414

1515
This is useful to offload long-winded tasks to the background instead of making your users wait for their HTTP request to finish.
1616

@@ -33,6 +33,7 @@ public HomeController(IQueue queue) {
3333
```
3434

3535
## Queuing Jobs
36+
3637
### Queuing Invocables
3738

3839
To queue an invocable, use `QueueInvocable`:
@@ -93,7 +94,7 @@ Use the `QueueAsyncTask` to queue up an async task:
9394

9495
### Queuing Synchronously
9596

96-
You use the `QueueTask()` method to add a task to the queue.
97+
You use the `QueueTask()` method to add a task to the queue.
9798

9899
```csharp
99100
public IActionResult QueueTask() {
@@ -109,7 +110,7 @@ Event broadcasting is great - but what if your event listeners are doing some he
109110
By using `QueueBroadcast`, you can queue an event to be broadcasted in the background.
110111

111112
```csharp
112-
this._queue.QueueBroadcast(new OrderCreated(orderId));
113+
this._queue.QueueBroadcast(new OrderCreated(orderId));
113114
```
114115

115116
### Queuing Cancellable Invocables
@@ -120,7 +121,7 @@ By using `QueueCancellableInvocable` you can build invocables that can be cancel
120121

121122
```csharp
122123
var (taskGuid, token) = queue.QueueCancellableInvocable<CancellableInvocable>();
123-
124+
124125
// Somewhere else....
125126
126127
token.Cancel();
@@ -138,6 +139,7 @@ while(!this.Token.IsCancellationRequested)
138139
await ProcessNextRecord();
139140
}
140141
```
142+
141143
## Metrics
142144

143145
You can gain some insight into how the queue is doing at a given moment in time.
@@ -153,7 +155,7 @@ Available methods:
153155

154156
## Tracking Task Progress
155157

156-
Most of the methods on the `IQueue` interface will return a `Guid` that represents the unique id for the task you pushed to the queue. Also, Coravel's queue exposes some internal events that you can hook into.
158+
Most of the methods on the `IQueue` interface will return a `Guid` that represents the unique id for the task you pushed to the queue. Also, Coravel's queue exposes some internal events that you can hook into.
157159

158160
Combining these: you can create listeners for the events `QueueTaskStarted` and `QueueTaskCompleted` that verify the progress of specific tasks in real-time. When a task/job crashes, then the event `DequeuedTaskFailed` will be emitted. Creating a listener for this one might be helpful too.
159161

@@ -198,6 +200,19 @@ You can adjust this delay in the `appsettings.json` file.
198200
}
199201
```
200202

203+
Alternatively, you can adjust the consummation delay using `AddQueue`:
204+
205+
```csharp
206+
services.AddQueue(queueOptions => {
207+
// Consume queue every 5 seconds.
208+
queueOptions.ConsummationDelay = 5;
209+
});
210+
```
211+
212+
:::tip
213+
`QueueOptions` will take precedence over your configuration file if both are defined.
214+
:::
215+
201216
## Logging Task Progress
202217

203218
Coravel uses the `ILogger` .NET Core interface to allow logging task progress:

Src/Coravel/QueueServiceRegistration.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,23 @@ public static class QueueServiceRegistration
1919
/// <returns></returns>
2020
public static IServiceCollection AddQueue(this IServiceCollection services)
2121
{
22+
services.AddSingleton<QueueOptions>(new QueueOptions());
23+
services.AddSingleton<IQueue>(p =>
24+
new Queue(
25+
p.GetRequiredService<IServiceScopeFactory>(),
26+
p.GetService<IDispatcher>()
27+
)
28+
);
29+
services.AddHostedService<QueuingHost>();
30+
return services;
31+
}
32+
33+
public static IServiceCollection AddQueue(this IServiceCollection services, Action<QueueOptions> options)
34+
{
35+
var opt = new QueueOptions();
36+
options(opt);
37+
38+
services.AddSingleton<QueueOptions>(opt);
2239
services.AddSingleton<IQueue>(p =>
2340
new Queue(
2441
p.GetRequiredService<IServiceScopeFactory>(),

Src/Coravel/Queuing/HostedService/QueuingHost.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@ internal class QueuingHost : IHostedService, IDisposable
1616
private Queue _queue;
1717
private IConfiguration _configuration;
1818
private ILogger<QueuingHost> _logger;
19+
private QueueOptions _queueOptions;
1920
private readonly string QueueRunningMessage = "Coravel Queuing service is attempting to close but the queue is still running." +
2021
" App closing (in background) will be prevented until dequeued tasks are completed.";
2122

22-
public QueuingHost(IQueue queue, IConfiguration configuration, ILogger<QueuingHost> logger)
23+
public QueuingHost(IQueue queue, IConfiguration configuration, ILogger<QueuingHost> logger, QueueOptions queueOptions)
2324
{
2425
this._configuration = configuration;
2526
this._queue = queue as Queue;
2627
this._logger = logger;
28+
this._queueOptions = queueOptions;
2729
}
2830

2931
public Task StartAsync(CancellationToken cancellationToken)
@@ -37,9 +39,7 @@ public Task StartAsync(CancellationToken cancellationToken)
3739

3840
private int GetConsummationDelay()
3941
{
40-
var configurationSection = this._configuration.GetSection("Coravel:Queue:ConsummationDelay");
41-
bool couldParseDelay = int.TryParse(configurationSection.Value, out var parsedDelay);
42-
return couldParseDelay ? parsedDelay : 30;
42+
return this._queueOptions.GetConsummationDelay(this._configuration);
4343
}
4444

4545
private async Task ConsumeQueueAsync()
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using Microsoft.Extensions.Configuration;
2+
using System;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
using System.Threading.Tasks;
7+
8+
namespace Coravel.Queuing
9+
{
10+
public class QueueOptions
11+
{
12+
/// <summary>
13+
/// This determines how often (in seconds) the queue host will consume all pending tasks.
14+
/// </summary>
15+
public int? ConsummationDelay { get; set; }
16+
17+
public int GetConsummationDelay(IConfiguration configuration)
18+
{
19+
if (ConsummationDelay.HasValue) return ConsummationDelay.Value;
20+
21+
var configurationSection = configuration.GetSection("Coravel:Queue:ConsummationDelay");
22+
bool couldParseDelay = int.TryParse(configurationSection.Value, out var parsedDelay);
23+
return couldParseDelay ? parsedDelay : 30;
24+
}
25+
}
26+
}

0 commit comments

Comments
 (0)