Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/ReverseProxy/Health/ActiveHealthCheckMonitor.Log.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ private static class Log
EventIds.ActiveHealthProbeConstructionFailedOnCluster,
"Construction of an active health probe for destination `{destinationId}` on cluster `{clusterId}` failed.");

private static readonly Action<ILogger, string, string, Exception?> _activeHealthProbeCancelledOnDestination = LoggerMessage.Define<string, string>(
LogLevel.Debug,
EventIds.ActiveHealthProbeCancelledOnDestination,
"Active health probing for destination `{destinationId}` on cluster `{clusterId}` was cancelled.");

private static readonly Action<ILogger, string, Exception?> _startingActiveHealthProbingOnCluster = LoggerMessage.Define<string>(
LogLevel.Debug,
EventIds.StartingActiveHealthProbingOnCluster,
Expand Down Expand Up @@ -75,6 +80,11 @@ public static void ActiveHealthProbeConstructionFailedOnCluster(ILogger logger,
_activeHealthProbeConstructionFailedOnCluster(logger, destinationId, clusterId, ex);
}

public static void ActiveHealthProbeCancelledOnDestination(ILogger logger, string destinationId, string clusterId)
{
_activeHealthProbeCancelledOnDestination(logger, destinationId, clusterId, null);
}

public static void StartingActiveHealthProbingOnCluster(ILogger logger, string clusterId)
{
_startingActiveHealthProbingOnCluster(logger, clusterId, null);
Expand Down
16 changes: 13 additions & 3 deletions src/ReverseProxy/Health/ActiveHealthCheckMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,17 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
probeActivity?.AddTag("proxy.cluster_id", cluster.ClusterId);
probeActivity?.AddTag("proxy.destination_id", destination.DestinationId);

using var cts = new CancellationTokenSource(timeout);

HttpRequestMessage request;
try
{
request = _probingRequestFactory.CreateRequest(cluster.Model, destination.Model);
request = await _probingRequestFactory.CreateRequestAsync(cluster, destination, cts.Token);
}
catch (OperationCanceledException oce) when (!cts.IsCancellationRequested)
{
Log.ActiveHealthProbeCancelledOnDestination(_logger, destination.DestinationId, cluster.ClusterId);
return new DestinationProbingResult(destination, null, oce);
}
catch (Exception ex)
{
Expand All @@ -186,8 +193,6 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState
return new DestinationProbingResult(destination, null, ex);
}

using var cts = new CancellationTokenSource(timeout);

try
{
Log.SendingHealthProbeToEndpointOfDestination(_logger, request.RequestUri, destination.DestinationId, cluster.ClusterId);
Expand All @@ -198,6 +203,11 @@ private async Task<DestinationProbingResult> ProbeDestinationAsync(ClusterState

return new DestinationProbingResult(destination, response, null);
}
catch (OperationCanceledException oce) when (!cts.IsCancellationRequested)
{
Log.ActiveHealthProbeCancelledOnDestination(_logger, destination.DestinationId, cluster.ClusterId);
return new DestinationProbingResult(destination, null, oce);
}
catch (Exception ex)
{
Log.DestinationProbingFailed(_logger, destination.DestinationId, cluster.ClusterId, ex);
Expand Down
5 changes: 5 additions & 0 deletions src/ReverseProxy/Health/DefaultProbingRequestFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
using System.Net;
using System.Net.Http;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Http.Extensions;
using Microsoft.Net.Http.Headers;
Expand Down Expand Up @@ -39,4 +41,7 @@ public HttpRequestMessage CreateRequest(ClusterModel cluster, DestinationModel d

return request;
}

public ValueTask<HttpRequestMessage> CreateRequestAsync(ClusterState cluster, DestinationState destination, CancellationToken cancellationToken = default) =>
ValueTask.FromResult(CreateRequest(cluster.Model, destination.Model));
}
12 changes: 12 additions & 0 deletions src/ReverseProxy/Health/IProbingRequestFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Yarp.ReverseProxy.Model;

namespace Yarp.ReverseProxy.Health;
Expand All @@ -18,4 +20,14 @@ public interface IProbingRequestFactory
/// <param name="destination">The destination being probed.</param>
/// <returns>Probing <see cref="HttpRequestMessage"/>.</returns>
HttpRequestMessage CreateRequest(ClusterModel cluster, DestinationModel destination);

/// <summary>
/// Creates a probing request.
/// </summary>
/// <param name="cluster">The cluster being probed.</param>
/// <param name="destination">The destination being probed.</param>
/// <param name="cancellationToken">A token to cancel the operation.</param>
/// <returns>Probing <see cref="HttpRequestMessage"/>.</returns>
ValueTask<HttpRequestMessage> CreateRequestAsync(ClusterState cluster, DestinationState destination, CancellationToken cancellationToken = default) =>
ValueTask.FromResult(CreateRequest(cluster.Model, destination.Model));
}
1 change: 1 addition & 0 deletions src/ReverseProxy/Utilities/EventIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ internal static class EventIds
public static readonly EventId DelegationQueueNoLongerExists = new(65, nameof(DelegationQueueNoLongerExists));
public static readonly EventId ForwardingRequestCancelled = new(66, nameof(ForwardingRequestCancelled));
public static readonly EventId DelegationQueueDisposed = new(67, nameof(DelegationQueueDisposed));
public static readonly EventId ActiveHealthProbeCancelledOnDestination = new(68, nameof(ActiveHealthProbeCancelledOnDestination));
}
162 changes: 154 additions & 8 deletions test/ReverseProxy.Tests/Health/ActiveHealthCheckMonitorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,173 @@ public async Task CheckHealthAsync_ActiveHealthCheckIsEnabledForCluster_SendProb
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) });
}

[Theory]
[InlineData(false)] // Test old API (CreateRequest with Models) via default implementation
[InlineData(true)] // Test new API (CreateRequestAsync with State)
public async Task CheckHealthAsync_CustomUserAgentSpecified_UserAgentUnchanged(bool overrideAsyncMethod)
{
var policy = new Mock<IActiveHealthCheckPolicy>();
policy.SetupGet(p => p.Name).Returns("policy");

var requestFactory = new Mock<IProbingRequestFactory>();

HttpRequestMessage CreateCustomRequest()
{
var request = new HttpRequestMessage(HttpMethod.Get, "https://localhost:20000/cluster/api/health/");
request.Headers.UserAgent.ParseAdd("FooBar/9001");
return request;
}

if (overrideAsyncMethod)
{
requestFactory.Setup(p => p.CreateRequestAsync(It.IsAny<ClusterState>(), It.IsAny<DestinationState>(), It.IsAny<CancellationToken>()))
.Returns(() => ValueTask.FromResult(CreateCustomRequest()));
}
else
{
// Test the old API - the default implementation of CreateRequestAsync should call this
requestFactory.Setup(p => p.CreateRequest(It.IsAny<ClusterModel>(), It.IsAny<DestinationModel>()))
.Returns(CreateCustomRequest);

// Use default interface implementation for CreateRequestAsync
requestFactory.CallBase = true;
}

var options = Options.Create(new ActiveHealthCheckMonitorOptions());
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, requestFactory.Object, new Mock<TimeProvider>().Object, GetLogger());

var httpClient = GetHttpClient();
var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 1);

await monitor.CheckHealthAsync(new[] { cluster });

VerifySentProbeAndResult(cluster, httpClient, policy, new[] { ("https://localhost:20000/cluster/api/health/", 1) }, userAgent: @"^FooBar\/9001$");
}

[Fact]
public async Task CheckHealthAsync_CustomUserAgentSpecified_UserAgentUnchanged()
public async Task CheckHealthAsync_FactoryCancelledExternally_ProbePassedToPolicyWithException()
{
var policy = new Mock<IActiveHealthCheckPolicy>();
policy.SetupGet(p => p.Name).Returns("policy");

var externalCts = new CancellationTokenSource();
var requestFactory = new Mock<IProbingRequestFactory>();
requestFactory.Setup(p => p.CreateRequest(It.IsAny<ClusterModel>(), It.IsAny<DestinationModel>()))
.Returns((ClusterModel cluster, DestinationModel destination) =>

// First destination: factory throws OperationCanceledException (external cancellation)
// Second destination: succeeds normally
var callCount = 0;
requestFactory.Setup(p => p.CreateRequestAsync(It.IsAny<ClusterState>(), It.IsAny<DestinationState>(), It.IsAny<CancellationToken>()))
.Returns<ClusterState, DestinationState, CancellationToken>((cluster, destination, ct) =>
{
var request = new HttpRequestMessage(HttpMethod.Get, "https://localhost:20000/cluster/api/health/");
request.Headers.UserAgent.ParseAdd("FooBar/9001");
return request;
callCount++;
if (callCount == 1)
{
// Simulate external cancellation (not timeout) - throw without the timeout CTS being cancelled
throw new OperationCanceledException(externalCts.Token);
}

return ValueTask.FromResult(new HttpRequestMessage(HttpMethod.Get, $"https://localhost:20000/{destination.DestinationId}/health/"));
});

var options = Options.Create(new ActiveHealthCheckMonitorOptions());
var options = Options.Create(new ActiveHealthCheckMonitorOptions { DefaultTimeout = TimeSpan.FromSeconds(30) });
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, requestFactory.Object, new Mock<TimeProvider>().Object, GetLogger());

var httpClient = GetHttpClient();
var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 2);

await monitor.CheckHealthAsync(new[] { cluster });

// Policy should receive 2 results: one with exception (cancelled), one successful
policy.Verify(
p => p.ProbingCompleted(
cluster,
It.Is<IReadOnlyList<DestinationProbingResult>>(r =>
r.Count == 2 &&
r.Any(x => x.Exception is OperationCanceledException) &&
r.Any(x => x.Response != null && x.Response.StatusCode == HttpStatusCode.OK)))
, Times.Once);
policy.Verify(p => p.Name);
policy.VerifyNoOtherCalls();
}

[Fact]
public async Task CheckHealthAsync_SendAsyncCancelledExternally_ProbePassedToPolicyWithException()
{
var policy = new Mock<IActiveHealthCheckPolicy>();
policy.SetupGet(p => p.Name).Returns("policy");

var externalCts = new CancellationTokenSource();
var options = Options.Create(new ActiveHealthCheckMonitorOptions { DefaultTimeout = TimeSpan.FromSeconds(30) });
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, new DefaultProbingRequestFactory(), new Mock<TimeProvider>().Object, GetLogger());

// First destination: SendAsync throws OperationCanceledException (external cancellation)
// Second destination: succeeds normally
var callCount = 0;
var httpClient = new Mock<HttpMessageInvoker>(() => new HttpMessageInvoker(new Mock<HttpMessageHandler>().Object));
httpClient.Setup(c => c.SendAsync(It.IsAny<HttpRequestMessage>(), It.IsAny<CancellationToken>()))
.Returns<HttpRequestMessage, CancellationToken>((request, ct) =>
{
callCount++;
if (callCount == 1)
{
// Simulate external cancellation (not timeout) - throw without the timeout CTS being cancelled
throw new OperationCanceledException(externalCts.Token);
}

return Task.FromResult(new HttpResponseMessage(HttpStatusCode.OK) { Version = request.Version });
});

var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 2);

await monitor.CheckHealthAsync(new[] { cluster });

// Policy should receive 2 results: one with exception (cancelled), one successful
policy.Verify(
p => p.ProbingCompleted(
cluster,
It.Is<IReadOnlyList<DestinationProbingResult>>(r =>
r.Count == 2 &&
r.Any(x => x.Exception is OperationCanceledException) &&
r.Any(x => x.Response != null && x.Response.StatusCode == HttpStatusCode.OK)))
, Times.Once);
policy.Verify(p => p.Name);
policy.VerifyNoOtherCalls();
}

[Fact]
public async Task CheckHealthAsync_TimeoutCancellation_TreatedAsError()
{
var policy = new Mock<IActiveHealthCheckPolicy>();
policy.SetupGet(p => p.Name).Returns("policy");

var options = Options.Create(new ActiveHealthCheckMonitorOptions { DefaultTimeout = TimeSpan.FromMilliseconds(1) });
var monitor = new ActiveHealthCheckMonitor(options, new[] { policy.Object }, new DefaultProbingRequestFactory(), new Mock<TimeProvider>().Object, GetLogger());

var tcs = new TaskCompletionSource<HttpResponseMessage>();
var httpClient = new Mock<HttpMessageInvoker>(() => new HttpMessageInvoker(new Mock<HttpMessageHandler>().Object));
httpClient.Setup(c => c.SendAsync(It.IsAny<HttpRequestMessage>(), It.IsAny<CancellationToken>()))
.Returns<HttpRequestMessage, CancellationToken>((request, ct) =>
{
// Register callback to cancel the TCS when timeout occurs
ct.Register(() => tcs.TrySetCanceled(ct));
return tcs.Task;
});

var cluster = GetClusterInfo("cluster", "policy", true, httpClient.Object, destinationCount: 1);

await monitor.CheckHealthAsync(new[] { cluster });

VerifySentProbeAndResult(cluster, httpClient, policy, new[] { ("https://localhost:20000/cluster/api/health/", 1) }, userAgent: @"^FooBar\/9001$");
// Policy should receive 1 result with an exception (timeout is an error, not skipped)
policy.Verify(
p => p.ProbingCompleted(
cluster,
It.Is<IReadOnlyList<DestinationProbingResult>>(r =>
r.Count == 1 &&
r[0].Response == null &&
r[0].Exception is OperationCanceledException)),
Times.Once);
policy.Verify(p => p.Name);
policy.VerifyNoOtherCalls();
}

[Fact]
Expand Down Expand Up @@ -282,6 +425,9 @@ public async Task ProbeCluster_ClusterChanged_StopSendingProbes()

timeProvider.FireAllTimers();

Assert.Equal(2, timeProvider.TimerCount);
timeProvider.VerifyTimer(0, Interval0);
timeProvider.VerifyTimer(1, Interval1);
VerifySentProbeAndResult(cluster0, httpClient0, policy0, new[] { ("https://localhost:20000/cluster0/api/health/", 1), ("https://localhost:20001/cluster0/api/health/", 1) }, policyCallTimes: 1);
VerifySentProbeAndResult(cluster2, httpClient2, policy1, new[] { ("https://localhost:20000/cluster2/api/health/", 1), ("https://localhost:20001/cluster2/api/health/", 1) }, policyCallTimes: 1);

Expand Down
Loading