Skip to content

Commit d2f6a26

Browse files
authored
Extend ProbingRequestFactory to support Models and Async calls (#2985)
1 parent 7ead1cc commit d2f6a26

File tree

6 files changed

+195
-11
lines changed

6 files changed

+195
-11
lines changed

src/ReverseProxy/Health/ActiveHealthCheckMonitor.Log.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ private static class Log
3030
EventIds.ActiveHealthProbeConstructionFailedOnCluster,
3131
"Construction of an active health probe for destination `{destinationId}` on cluster `{clusterId}` failed.");
3232

33+
private static readonly Action<ILogger, string, string, Exception?> _activeHealthProbeCancelledOnDestination = LoggerMessage.Define<string, string>(
34+
LogLevel.Debug,
35+
EventIds.ActiveHealthProbeCancelledOnDestination,
36+
"Active health probing for destination `{destinationId}` on cluster `{clusterId}` was cancelled.");
37+
3338
private static readonly Action<ILogger, string, Exception?> _startingActiveHealthProbingOnCluster = LoggerMessage.Define<string>(
3439
LogLevel.Debug,
3540
EventIds.StartingActiveHealthProbingOnCluster,
@@ -75,6 +80,11 @@ public static void ActiveHealthProbeConstructionFailedOnCluster(ILogger logger,
7580
_activeHealthProbeConstructionFailedOnCluster(logger, destinationId, clusterId, ex);
7681
}
7782

83+
public static void ActiveHealthProbeCancelledOnDestination(ILogger logger, string destinationId, string clusterId)
84+
{
85+
_activeHealthProbeCancelledOnDestination(logger, destinationId, clusterId, null);
86+
}
87+
7888
public static void StartingActiveHealthProbingOnCluster(ILogger logger, string clusterId)
7989
{
8090
_startingActiveHealthProbingOnCluster(logger, clusterId, null);

src/ReverseProxy/Health/ActiveHealthCheckMonitor.cs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,17 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
172172
probeActivity?.AddTag("proxy.cluster_id", cluster.ClusterId);
173173
probeActivity?.AddTag("proxy.destination_id", destination.DestinationId);
174174

175+
using var cts = new CancellationTokenSource(timeout);
176+
175177
HttpRequestMessage request;
176178
try
177179
{
178-
request = _probingRequestFactory.CreateRequest(cluster.Model, destination.Model);
180+
request = await _probingRequestFactory.CreateRequestAsync(cluster, destination, cts.Token);
181+
}
182+
catch (OperationCanceledException oce) when (!cts.IsCancellationRequested)
183+
{
184+
Log.ActiveHealthProbeCancelledOnDestination(_logger, destination.DestinationId, cluster.ClusterId);
185+
return new DestinationProbingResult(destination, null, oce);
179186
}
180187
catch (Exception ex)
181188
{
@@ -186,8 +193,6 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
186193
return new DestinationProbingResult(destination, null, ex);
187194
}
188195

189-
using var cts = new CancellationTokenSource(timeout);
190-
191196
try
192197
{
193198
Log.SendingHealthProbeToEndpointOfDestination(_logger, request.RequestUri, destination.DestinationId, cluster.ClusterId);
@@ -198,6 +203,11 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
198203

199204
return new DestinationProbingResult(destination, response, null);
200205
}
206+
catch (OperationCanceledException oce) when (!cts.IsCancellationRequested)
207+
{
208+
Log.ActiveHealthProbeCancelledOnDestination(_logger, destination.DestinationId, cluster.ClusterId);
209+
return new DestinationProbingResult(destination, null, oce);
210+
}
201211
catch (Exception ex)
202212
{
203213
Log.DestinationProbingFailed(_logger, destination.DestinationId, cluster.ClusterId, ex);

src/ReverseProxy/Health/DefaultProbingRequestFactory.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
using System.Net;
55
using System.Net.Http;
66
using System.Reflection;
7+
using System.Threading;
8+
using System.Threading.Tasks;
79
using Microsoft.AspNetCore.Http;
810
using Microsoft.AspNetCore.Http.Extensions;
911
using Microsoft.Net.Http.Headers;
@@ -39,4 +41,7 @@ public HttpRequestMessage CreateRequest(ClusterModel cluster, DestinationModel d
3941

4042
return request;
4143
}
44+
45+
public ValueTask<HttpRequestMessage> CreateRequestAsync(ClusterState cluster, DestinationState destination, CancellationToken cancellationToken = default) =>
46+
ValueTask.FromResult(CreateRequest(cluster.Model, destination.Model));
4247
}

src/ReverseProxy/Health/IProbingRequestFactory.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Net.Http;
5+
using System.Threading;
6+
using System.Threading.Tasks;
57
using Yarp.ReverseProxy.Model;
68

79
namespace Yarp.ReverseProxy.Health;
@@ -18,4 +20,14 @@ public interface IProbingRequestFactory
1820
/// <param name="destination">The destination being probed.</param>
1921
/// <returns>Probing <see cref="HttpRequestMessage"/>.</returns>
2022
HttpRequestMessage CreateRequest(ClusterModel cluster, DestinationModel destination);
23+
24+
/// <summary>
25+
/// Creates a probing request.
26+
/// </summary>
27+
/// <param name="cluster">The cluster being probed.</param>
28+
/// <param name="destination">The destination being probed.</param>
29+
/// <param name="cancellationToken">A token to cancel the operation.</param>
30+
/// <returns>Probing <see cref="HttpRequestMessage"/>.</returns>
31+
ValueTask<HttpRequestMessage> CreateRequestAsync(ClusterState cluster, DestinationState destination, CancellationToken cancellationToken = default) =>
32+
ValueTask.FromResult(CreateRequest(cluster.Model, destination.Model));
2133
}

src/ReverseProxy/Utilities/EventIds.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,5 @@ internal static class EventIds
7171
public static readonly EventId DelegationQueueNoLongerExists = new(65, nameof(DelegationQueueNoLongerExists));
7272
public static readonly EventId ForwardingRequestCancelled = new(66, nameof(ForwardingRequestCancelled));
7373
public static readonly EventId DelegationQueueDisposed = new(67, nameof(DelegationQueueDisposed));
74+
public static readonly EventId ActiveHealthProbeCancelledOnDestination = new(68, nameof(ActiveHealthProbeCancelledOnDestination));
7475
}

test/ReverseProxy.Tests/Health/ActiveHealthCheckMonitorTests.cs

Lines changed: 154 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,30 +58,173 @@ public async Task CheckHealthAsync_ActiveHealthCheckIsEnabledForCluster_SendProb
5858
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) });
5959
}
6060

61+
[Theory]
62+
[InlineData(false)] // Test old API (CreateRequest with Models) via default implementation
63+
[InlineData(true)] // Test new API (CreateRequestAsync with State)
64+
public async Task CheckHealthAsync_CustomUserAgentSpecified_UserAgentUnchanged(bool overrideAsyncMethod)
65+
{
66+
var policy = new Mock<IActiveHealthCheckPolicy>();
67+
policy.SetupGet(p => p.Name).Returns("policy");
68+
69+
var requestFactory = new Mock<IProbingRequestFactory>();
70+
71+
HttpRequestMessage CreateCustomRequest()
72+
{
73+
var request = new HttpRequestMessage(HttpMethod.Get, "https://localhost:20000/cluster/api/health/");
74+
request.Headers.UserAgent.ParseAdd("FooBar/9001");
75+
return request;
76+
}
77+
78+
if (overrideAsyncMethod)
79+
{
80+
requestFactory.Setup(p => p.CreateRequestAsync(It.IsAny<ClusterState>(), It.IsAny<DestinationState>(), It.IsAny<CancellationToken>()))
81+
.Returns(() => ValueTask.FromResult(CreateCustomRequest()));
82+
}
83+
else
84+
{
85+
// Test the old API - the default implementation of CreateRequestAsync should call this
86+
requestFactory.Setup(p => p.CreateRequest(It.IsAny<ClusterModel>(), It.IsAny<DestinationModel>()))
87+
.Returns(CreateCustomRequest);
88+
89+
// Use default interface implementation for CreateRequestAsync
90+
requestFactory.CallBase = true;
91+
}
92+
93+
var options = Options.Create(new ActiveHealthCheckMonitorOptions());
94+
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, requestFactory.Object, new Mock<TimeProvider>().Object, GetLogger());
95+
96+
var httpClient = GetHttpClient();
97+
var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 1);
98+
99+
await monitor.CheckHealthAsync(new[] { cluster });
100+
101+
VerifySentProbeAndResult(cluster, httpClient, policy, new[] { ("https://localhost:20000/cluster/api/health/", 1) }, userAgent: @"^FooBar\/9001$");
102+
}
103+
61104
[Fact]
62-
public async Task CheckHealthAsync_CustomUserAgentSpecified_UserAgentUnchanged()
105+
public async Task CheckHealthAsync_FactoryCancelledExternally_ProbePassedToPolicyWithException()
63106
{
64107
var policy = new Mock<IActiveHealthCheckPolicy>();
65108
policy.SetupGet(p => p.Name).Returns("policy");
66109

110+
var externalCts = new CancellationTokenSource();
67111
var requestFactory = new Mock<IProbingRequestFactory>();
68-
requestFactory.Setup(p => p.CreateRequest(It.IsAny<ClusterModel>(), It.IsAny<DestinationModel>()))
69-
.Returns((ClusterModel cluster, DestinationModel destination) =>
112+
113+
// First destination: factory throws OperationCanceledException (external cancellation)
114+
// Second destination: succeeds normally
115+
var callCount = 0;
116+
requestFactory.Setup(p => p.CreateRequestAsync(It.IsAny<ClusterState>(), It.IsAny<DestinationState>(), It.IsAny<CancellationToken>()))
117+
.Returns<ClusterState, DestinationState, CancellationToken>((cluster, destination, ct) =>
70118
{
71-
var request = new HttpRequestMessage(HttpMethod.Get, "https://localhost:20000/cluster/api/health/");
72-
request.Headers.UserAgent.ParseAdd("FooBar/9001");
73-
return request;
119+
callCount++;
120+
if (callCount == 1)
121+
{
122+
// Simulate external cancellation (not timeout) - throw without the timeout CTS being cancelled
123+
throw new OperationCanceledException(externalCts.Token);
124+
}
125+
126+
return ValueTask.FromResult(new HttpRequestMessage(HttpMethod.Get, $"https://localhost:20000/{destination.DestinationId}/health/"));
74127
});
75128

76-
var options = Options.Create(new ActiveHealthCheckMonitorOptions());
129+
var options = Options.Create(new ActiveHealthCheckMonitorOptions { DefaultTimeout = TimeSpan.FromSeconds(30) });
77130
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, requestFactory.Object, new Mock<TimeProvider>().Object, GetLogger());
78131

79132
var httpClient = GetHttpClient();
133+
var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 2);
134+
135+
await monitor.CheckHealthAsync(new[] { cluster });
136+
137+
// Policy should receive 2 results: one with exception (cancelled), one successful
138+
policy.Verify(
139+
p => p.ProbingCompleted(
140+
cluster,
141+
It.Is<IReadOnlyList<DestinationProbingResult>>(r =>
142+
r.Count == 2 &&
143+
r.Any(x => x.Exception is OperationCanceledException) &&
144+
r.Any(x => x.Response != null && x.Response.StatusCode == HttpStatusCode.OK)))
145+
, Times.Once);
146+
policy.Verify(p => p.Name);
147+
policy.VerifyNoOtherCalls();
148+
}
149+
150+
[Fact]
151+
public async Task CheckHealthAsync_SendAsyncCancelledExternally_ProbePassedToPolicyWithException()
152+
{
153+
var policy = new Mock<IActiveHealthCheckPolicy>();
154+
policy.SetupGet(p => p.Name).Returns("policy");
155+
156+
var externalCts = new CancellationTokenSource();
157+
var options = Options.Create(new ActiveHealthCheckMonitorOptions { DefaultTimeout = TimeSpan.FromSeconds(30) });
158+
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, new DefaultProbingRequestFactory(), new Mock<TimeProvider>().Object, GetLogger());
159+
160+
// First destination: SendAsync throws OperationCanceledException (external cancellation)
161+
// Second destination: succeeds normally
162+
var callCount = 0;
163+
var httpClient = new Mock<HttpMessageInvoker>(() => new HttpMessageInvoker(new Mock<HttpMessageHandler>().Object));
164+
httpClient.Setup(c => c.SendAsync(It.IsAny<HttpRequestMessage>(), It.IsAny<CancellationToken>()))
165+
.Returns<HttpRequestMessage, CancellationToken>((request, ct) =>
166+
{
167+
callCount++;
168+
if (callCount == 1)
169+
{
170+
// Simulate external cancellation (not timeout) - throw without the timeout CTS being cancelled
171+
throw new OperationCanceledException(externalCts.Token);
172+
}
173+
174+
return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK) { Version = request.Version });
175+
});
176+
177+
var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 2);
178+
179+
await monitor.CheckHealthAsync(new[] { cluster });
180+
181+
// Policy should receive 2 results: one with exception (cancelled), one successful
182+
policy.Verify(
183+
p => p.ProbingCompleted(
184+
cluster,
185+
It.Is<IReadOnlyList<DestinationProbingResult>>(r =>
186+
r.Count == 2 &&
187+
r.Any(x => x.Exception is OperationCanceledException) &&
188+
r.Any(x => x.Response != null && x.Response.StatusCode == HttpStatusCode.OK)))
189+
, Times.Once);
190+
policy.Verify(p => p.Name);
191+
policy.VerifyNoOtherCalls();
192+
}
193+
194+
[Fact]
195+
public async Task CheckHealthAsync_TimeoutCancellation_TreatedAsError()
196+
{
197+
var policy = new Mock<IActiveHealthCheckPolicy>();
198+
policy.SetupGet(p => p.Name).Returns("policy");
199+
200+
var options = Options.Create(new ActiveHealthCheckMonitorOptions { DefaultTimeout = TimeSpan.FromMilliseconds(1) });
201+
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, new DefaultProbingRequestFactory(), new Mock<TimeProvider>().Object, GetLogger());
202+
203+
var tcs = new TaskCompletionSource<HttpResponseMessage>();
204+
var httpClient = new Mock<HttpMessageInvoker>(() => new HttpMessageInvoker(new Mock<HttpMessageHandler>().Object));
205+
httpClient.Setup(c => c.SendAsync(It.IsAny<HttpRequestMessage>(), It.IsAny<CancellationToken>()))
206+
.Returns<HttpRequestMessage, CancellationToken>((request, ct) =>
207+
{
208+
// Register callback to cancel the TCS when timeout occurs
209+
ct.Register(() => tcs.TrySetCanceled(ct));
210+
return tcs.Task;
211+
});
212+
80213
var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 1);
81214

82215
await monitor.CheckHealthAsync(new[] { cluster });
83216

84-
VerifySentProbeAndResult(cluster, httpClient, policy, new[] { ("https://localhost:20000/cluster/api/health/", 1) }, userAgent: @"^FooBar\/9001$");
217+
// Policy should receive 1 result with an exception (timeout is an error, not skipped)
218+
policy.Verify(
219+
p => p.ProbingCompleted(
220+
cluster,
221+
It.Is<IReadOnlyList<DestinationProbingResult>>(r =>
222+
r.Count == 1 &&
223+
r[0].Response == null &&
224+
r[0].Exception is OperationCanceledException)),
225+
Times.Once);
226+
policy.Verify(p => p.Name);
227+
policy.VerifyNoOtherCalls();
85228
}
86229

87230
[Fact]
@@ -282,6 +425,9 @@ public async Task ProbeCluster_ClusterChanged_StopSendingProbes()
282425

283426
timeProvider.FireAllTimers();
284427

428+
Assert.Equal(2, timeProvider.TimerCount);
429+
timeProvider.VerifyTimer(0, Interval0);
430+
timeProvider.VerifyTimer(1, Interval1);
285431
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 1), ("https://localhost:20001/cluster0/api/health/", 1) }, policyCallTimes: 1);
286432
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);
287433

0 commit comments

Comments
 (0)