diff --git a/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs b/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs index aeb694c0c7..9fb469f8f3 100644 --- a/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs +++ b/src/HealthChecks.AzureServiceBus/AzureServiceBusQueueHealthCheck.cs @@ -43,8 +43,15 @@ async Task CheckWithReceiver() _queueKey, _ => client.CreateReceiver(Options.QueueName)) .ConfigureAwait(false); - - await receiver.PeekMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var cancel_task = Task.Delay(Timeout.Infinite, cts.Token); + var peek_task = receiver.PeekMessageAsync(cancellationToken: cancellationToken); + var winner = await Task.WhenAny(peek_task, cancel_task).ConfigureAwait(false); + if (winner == peek_task) + { + cts.Cancel(); + } + await winner.ConfigureAwait(false); } Task CheckWithManagement() diff --git a/src/HealthChecks.AzureServiceBus/AzureServiceBusSubscriptionHealthCheck.cs b/src/HealthChecks.AzureServiceBus/AzureServiceBusSubscriptionHealthCheck.cs index de30992271..09ec0a00e2 100644 --- a/src/HealthChecks.AzureServiceBus/AzureServiceBusSubscriptionHealthCheck.cs +++ b/src/HealthChecks.AzureServiceBus/AzureServiceBusSubscriptionHealthCheck.cs @@ -45,7 +45,15 @@ async Task CheckWithReceiver() _ => client.CreateReceiver(Options.TopicName, Options.SubscriptionName)) .ConfigureAwait(false); - await receiver.PeekMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + var cancel_task = Task.Delay(Timeout.Infinite, cts.Token); + var peek_task = receiver.PeekMessageAsync(cancellationToken: cancellationToken); + var winner = await Task.WhenAny(peek_task, cancel_task).ConfigureAwait(false); + if (winner == peek_task) + { + cts.Cancel(); + } + await winner.ConfigureAwait(false); } Task CheckWithManagement() diff --git a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs index e186d5ae37..c26686fbc2 100644 --- a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs +++ b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusQueueHealthCheckTests.cs @@ -1,9 +1,11 @@ +using System.Diagnostics; using Azure.Core; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using HealthChecks.AzureServiceBus.Configuration; using NSubstitute; using NSubstitute.ExceptionExtensions; +using Xunit.Abstractions; namespace HealthChecks.AzureServiceBus.Tests; @@ -19,8 +21,9 @@ public class azureservicebusqueuehealthcheck_should private readonly ServiceBusClientProvider _clientProvider; private readonly ServiceBusAdministrationClient _serviceBusAdministrationClient; private readonly TokenCredential _tokenCredential; + private readonly ITestOutputHelper _output; - public azureservicebusqueuehealthcheck_should() + public azureservicebusqueuehealthcheck_should(ITestOutputHelper output) { ConnectionString = Guid.NewGuid().ToString(); FullyQualifiedName = Guid.NewGuid().ToString(); @@ -37,6 +40,7 @@ public azureservicebusqueuehealthcheck_should() _clientProvider.CreateManagementClient(ConnectionString).Returns(_serviceBusAdministrationClient); _clientProvider.CreateManagementClient(FullyQualifiedName, _tokenCredential).Returns(_serviceBusAdministrationClient); _serviceBusClient.CreateReceiver(QueueName).Returns(_serviceBusReceiver); + _output = output; } [Theory] @@ -263,6 +267,38 @@ await _serviceBusReceiver .PeekMessageAsync(cancellationToken: tokenSource.Token); } + [Fact] + public async Task respect_cancellation_token_when_using_peek() + { + _serviceBusReceiver + .PeekMessageAsync(cancellationToken: default) + .ReturnsForAnyArgs(async Task (call_info) => + { + await Task.Delay(TimeSpan.FromSeconds(5)); + throw new Exception(); + }); + + using var tokenSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); + var sw = new Stopwatch(); + sw.Start(); + var actual = await ExecuteHealthCheckAsync( + QueueName, + true, + connectionString: ConnectionString, + cancellationToken: tokenSource.Token); + sw.Stop(); + actual.Status.ShouldBe(HealthStatus.Unhealthy); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(1)); + + _serviceBusClient + .Received(1) + .CreateReceiver(QueueName); + + await _serviceBusReceiver + .Received(1) + .PeekMessageAsync(cancellationToken: tokenSource.Token); + } + [Fact] public async Task return_healthy_when_checking_healthy_service_through_administration_and_connection_string() { diff --git a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusSubscriptionHealthCheckTests.cs b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusSubscriptionHealthCheckTests.cs index a3de7d96a6..17b30c34f4 100644 --- a/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusSubscriptionHealthCheckTests.cs +++ b/test/HealthChecks.AzureServiceBus.Tests/AzureServiceBusSubscriptionHealthCheckTests.cs @@ -1,3 +1,4 @@ +using System.Diagnostics; using Azure.Core; using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; @@ -265,6 +266,38 @@ await _serviceBusReceiver .PeekMessageAsync(cancellationToken: tokenSource.Token); } + [Fact] + public async Task respect_cancellation_token_when_using_peek() + { + _serviceBusReceiver + .PeekMessageAsync(cancellationToken: default) + .ReturnsForAnyArgs(async Task (call_info) => + { + await Task.Delay(TimeSpan.FromSeconds(5)); + throw new Exception(); + }); + + using var tokenSource = new CancellationTokenSource(TimeSpan.FromMilliseconds(100)); + var sw = new Stopwatch(); + sw.Start(); + var actual = await ExecuteHealthCheckAsync( + TopicName, + true, + connectionString: ConnectionString, + cancellationToken: tokenSource.Token); + sw.Stop(); + actual.Status.ShouldBe(HealthStatus.Unhealthy); + sw.Elapsed.ShouldBeLessThan(TimeSpan.FromSeconds(1)); + + _serviceBusClient + .Received(1) + .CreateReceiver(TopicName, SubscriptionName); + + await _serviceBusReceiver + .Received(1) + .PeekMessageAsync(cancellationToken: tokenSource.Token); + } + [Fact] public async Task return_healthy_when_checking_healthy_service_through_administration_and_connection_string() {