Skip to content

Commit 2437771

Browse files
authored
Merge pull request #831 from camunda-community-hub/LennartKleymann-813-added-backoff-to-job-worker
Lennart kleymann 813 added backoff to job worker
2 parents d4e1671 + b00ba31 commit 2437771

File tree

8 files changed

+308
-8
lines changed

8 files changed

+308
-8
lines changed
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using NUnit.Framework;
5+
using Zeebe.Client.Api.Worker;
6+
using Zeebe.Client.Impl.Worker;
7+
8+
namespace Zeebe.Client;
9+
10+
[TestFixture]
11+
public sealed class ExponentialBackoffTests
12+
{
13+
[Test]
14+
public void ShouldReturnDelayWithinBounds_WhenNoJitter()
15+
{
16+
// given
17+
var maxDelay = TimeSpan.FromMilliseconds(1_000);
18+
var minDelay = TimeSpan.FromMilliseconds(50);
19+
IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl()
20+
.MaxDelay(maxDelay)
21+
.MinDelay(minDelay)
22+
.BackoffFactor(1.6)
23+
.JitterFactor(0)
24+
.Build();
25+
26+
var delays = new List<long>();
27+
long current = supplier.SupplyRetryDelay(0);
28+
29+
// when
30+
for (var i = 0; i < 100; i++)
31+
{
32+
delays.Add(current);
33+
current = supplier.SupplyRetryDelay(current);
34+
}
35+
36+
// then - with zero jitter, sequence should monotonically increase until it caps at max
37+
Assert.That(delays, Is.Not.Empty);
38+
Assert.That(delays.First(), Is.EqualTo((long)minDelay.TotalMilliseconds));
39+
Assert.That(delays.Last(), Is.EqualTo((long)maxDelay.TotalMilliseconds));
40+
41+
long previous = -1;
42+
foreach (var delay in delays)
43+
{
44+
if (delay != (long)maxDelay.TotalMilliseconds)
45+
{
46+
Assert.That(delay, Is.GreaterThan(previous));
47+
}
48+
previous = delay;
49+
}
50+
}
51+
52+
[Test]
53+
public void ShouldBeRandomizedWithJitter()
54+
{
55+
// given
56+
var maxDelay = TimeSpan.FromMilliseconds(1_000);
57+
var minDelay = TimeSpan.FromMilliseconds(50);
58+
const double jitterFactor = 0.2;
59+
60+
IBackoffSupplier supplier = new ExponentialBackoffBuilderImpl()
61+
.MaxDelay(maxDelay)
62+
.MinDelay(minDelay)
63+
.BackoffFactor(1.5)
64+
.JitterFactor(jitterFactor)
65+
.Build();
66+
67+
var lowerMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * -jitterFactor));
68+
var upperMaxBound = (long)Math.Round(maxDelay.TotalMilliseconds + (maxDelay.TotalMilliseconds * jitterFactor));
69+
70+
// when - always compute from max to test jitter around the cap
71+
var delays = new List<long>();
72+
for (var i = 0; i < 10; i++)
73+
{
74+
var value = supplier.SupplyRetryDelay((long)maxDelay.TotalMilliseconds);
75+
delays.Add(value);
76+
}
77+
78+
// then
79+
Assert.That(delays, Is.Not.Empty);
80+
foreach (var delay in delays)
81+
{
82+
Assert.That(delay, Is.InRange(lowerMaxBound, upperMaxBound));
83+
}
84+
Assert.That(delays.Distinct().Count(), Is.GreaterThan(1));
85+
}
86+
}
87+
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using System;
2+
3+
namespace Zeebe.Client.Api.Worker;
4+
5+
/// <summary>
6+
/// Supplies the delay for the next retry after a failed activate request.
7+
/// Value is in milliseconds; return value may be zero to retry immediately.
8+
/// </summary>
9+
public interface IBackoffSupplier
10+
{
11+
/// <summary>
12+
/// Returns the delay before the next retry.
13+
/// </summary>
14+
/// <param name="currentRetryDelay">The previously used delay in milliseconds.</param>
15+
/// <returns>The new retry delay in milliseconds.</returns>
16+
long SupplyRetryDelay(long currentRetryDelay);
17+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
3+
namespace Zeebe.Client.Api.Worker;
4+
5+
public interface IExponentialBackoffBuilder
6+
{
7+
/// <summary>
8+
/// Sets the maximum retry delay. Default is 5000ms.
9+
/// </summary>
10+
IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay);
11+
12+
/// <summary>
13+
/// Sets the minimum retry delay. Default is 50ms.
14+
/// </summary>
15+
IExponentialBackoffBuilder MinDelay(TimeSpan minDelay);
16+
17+
/// <summary>
18+
/// Sets the multiplication factor (previous delay * factor). Default is 1.6.
19+
/// </summary>
20+
IExponentialBackoffBuilder BackoffFactor(double backoffFactor);
21+
22+
/// <summary>
23+
/// Sets optional jitter factor (+/- factor of delay). Default is 0.1.
24+
/// </summary>
25+
IExponentialBackoffBuilder JitterFactor(double jitterFactor);
26+
27+
/// <summary>
28+
/// Sets the random source used to compute jitter. Default is new Random().
29+
/// </summary>
30+
IExponentialBackoffBuilder Random(Random random);
31+
32+
/// <summary>
33+
/// Builds the supplier with the provided configuration.
34+
/// </summary>
35+
IBackoffSupplier Build();
36+
}

Client/Api/Worker/IJobWorkerBuilderStep1.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,14 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
237237
/// <returns>the builder for this worker.</returns>
238238
IJobWorkerBuilderStep3 HandlerThreads(byte threadCount);
239239

240+
/// <summary>
241+
/// Configures a retry backoff supplier used when requests fail.
242+
/// Defaults to an exponential backoff (max 5000ms, min 50ms, factor 1.6, jitter 0.1) if not set.
243+
/// </summary>
244+
/// <param name="backoffSupplier">The supplier used to compute the next retry delay in ms.</param>
245+
/// <returns>The builder for this worker.</returns>
246+
IJobWorkerBuilderStep3 BackoffSupplier(IBackoffSupplier backoffSupplier);
247+
240248
/// <summary>
241249
/// Enable or disable gRPC job streaming for this worker.
242250
/// </summary>
@@ -249,4 +257,4 @@ public interface IJobWorkerBuilderStep3 : ITenantIdsCommandStep<IJobWorkerBuilde
249257
/// </summary>
250258
/// <returns>the worker.</returns>
251259
IJobWorker Open();
252-
}
260+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using System;
2+
using Zeebe.Client.Api.Worker;
3+
4+
namespace Zeebe.Client.Impl.Worker;
5+
6+
internal sealed class ExponentialBackoffBuilderImpl : IExponentialBackoffBuilder
7+
{
8+
private TimeSpan maxDelay = TimeSpan.FromMilliseconds(5000);
9+
private TimeSpan minDelay = TimeSpan.FromMilliseconds(50);
10+
private double backoffFactor = 1.6;
11+
private double jitterFactor = 0.1;
12+
private Random random = new Random();
13+
14+
public IExponentialBackoffBuilder MaxDelay(TimeSpan maxDelay)
15+
{
16+
this.maxDelay = maxDelay;
17+
return this;
18+
}
19+
20+
public IExponentialBackoffBuilder MinDelay(TimeSpan minDelay)
21+
{
22+
this.minDelay = minDelay;
23+
return this;
24+
}
25+
26+
public IExponentialBackoffBuilder BackoffFactor(double backoffFactor)
27+
{
28+
this.backoffFactor = backoffFactor;
29+
return this;
30+
}
31+
32+
public IExponentialBackoffBuilder JitterFactor(double jitterFactor)
33+
{
34+
this.jitterFactor = jitterFactor;
35+
return this;
36+
}
37+
38+
public IExponentialBackoffBuilder Random(Random random)
39+
{
40+
this.random = random ?? new Random();
41+
return this;
42+
}
43+
44+
public IBackoffSupplier Build()
45+
{
46+
return new ExponentialBackoffSupplier(minDelay, maxDelay, backoffFactor, jitterFactor, random);
47+
}
48+
}
49+
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
using System;
2+
using Zeebe.Client.Api.Worker;
3+
4+
namespace Zeebe.Client.Impl.Worker;
5+
6+
/// <summary>
7+
/// An implementation of <see cref="IBackoffSupplier"/> which uses the **Exponential Backoff with Jitter** strategy
8+
/// for calculating retry delays.
9+
/// <para>
10+
/// The implementation uses a simple formula: it multiplies the previous delay with a backoff multiplier
11+
/// and adds some jitter to avoid multiple clients polling at the same time.
12+
/// </para>
13+
/// </summary>
14+
/// <remarks>
15+
/// The core logic is copied from the Zeebe Java client's
16+
/// <c>io.camunda.zeebe.client.impl.worker.ExponentialBackoff</c> implementation
17+
/// (source: <a href="https://github.com/camunda/camunda/blob/5764a0a3e6c3d3253c5c9608bf4478f8e2281af7/clients/java-deprecated/src/main/java/io/camunda/zeebe/client/impl/worker/ExponentialBackoff.java#L31">GitHub</a>).
18+
/// <para>
19+
/// The next delay is calculated by clamping the multiplied delay between <c>minDelay</c> and <c>maxDelay</c>,
20+
/// and then adding a random jitter:
21+
/// </para>
22+
/// <c>max(min(maxDelay, currentDelay * backoffFactor), minDelay) + jitter</c>
23+
/// <para>The final result is ensured to be non-negative and rounded.</para>
24+
/// </remarks>
25+
internal sealed class ExponentialBackoffSupplier : IBackoffSupplier
26+
{
27+
private readonly double maxDelayMs;
28+
private readonly double minDelayMs;
29+
private readonly double backoffFactor;
30+
private readonly double jitterFactor;
31+
private readonly Random random;
32+
33+
internal ExponentialBackoffSupplier(
34+
TimeSpan minDelay,
35+
TimeSpan maxDelay,
36+
double backoffFactor,
37+
double jitterFactor,
38+
Random random)
39+
{
40+
maxDelayMs = maxDelay.TotalMilliseconds;
41+
minDelayMs = minDelay.TotalMilliseconds;
42+
this.backoffFactor = backoffFactor;
43+
this.jitterFactor = jitterFactor;
44+
this.random = random ?? Random.Shared;
45+
}
46+
47+
public long SupplyRetryDelay(long currentRetryDelay)
48+
{
49+
var previous = (double)currentRetryDelay;
50+
var multiplied = previous * backoffFactor;
51+
var clamped = Math.Max(Math.Min(maxDelayMs, multiplied), minDelayMs);
52+
53+
var range = clamped * jitterFactor;
54+
var jitter = (random.NextDouble() * (2 * range)) - range;
55+
56+
var next = clamped + jitter;
57+
if (next < 0)
58+
{
59+
next = 0;
60+
}
61+
62+
return (long)Math.Round(next);
63+
}
64+
}
65+

0 commit comments

Comments
 (0)