Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved cosmos job hosting to use unified retry logic #4773

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
Expand Down Expand Up @@ -59,6 +61,7 @@ public ReindexJobWorkerTests()
Options.Create(_reindexJobConfiguration),
_reindexJobTask.CreateMockScopeProvider(),
searchParameterOperations,
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
NullLogger<ReindexJobWorker>.Instance);

_reindexJobWorker.Handle(new Messages.Search.SearchParametersInitializedNotification(), CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
using Microsoft.Health.Fhir.Core.Features.Search.Parameters;
using Microsoft.Health.Fhir.Core.Messages.Search;
Expand All @@ -29,6 +32,7 @@ public class ReindexJobWorker : INotificationHandler<SearchParametersInitialized
private readonly ReindexJobConfiguration _reindexJobConfiguration;
private readonly IScopeProvider<IReindexJobTask> _reindexJobTaskFactory;
private readonly ISearchParameterOperations _searchParameterOperations;
private readonly RequestContextAccessor<IFhirRequestContext> _contextAccessor;
private readonly ILogger _logger;
private bool _searchParametersInitialized = false;

Expand All @@ -37,19 +41,15 @@ public ReindexJobWorker(
IOptions<ReindexJobConfiguration> reindexJobConfiguration,
IScopeProvider<IReindexJobTask> reindexJobTaskFactory,
ISearchParameterOperations searchParameterOperations,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
ILogger<ReindexJobWorker> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
EnsureArg.IsNotNull(logger, nameof(logger));

_fhirOperationDataStoreFactory = fhirOperationDataStoreFactory;
_reindexJobConfiguration = reindexJobConfiguration.Value;
_reindexJobTaskFactory = reindexJobTaskFactory;
_searchParameterOperations = searchParameterOperations;
_logger = logger;
_fhirOperationDataStoreFactory = EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
_reindexJobConfiguration = EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
_reindexJobTaskFactory = EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
_searchParameterOperations = EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
_contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));
}

public async Task ExecuteAsync(CancellationToken cancellationToken)
Expand All @@ -60,6 +60,20 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
{
if (_searchParametersInitialized)
{
// Create a background task context to trigger the correct retry policy.
var fhirRequestContext = new FhirRequestContext(
method: nameof(ReindexJobWorker),
uriString: nameof(ReindexJobWorker),
baseUriString: nameof(ReindexJobWorker),
correlationId: Guid.NewGuid().ToString(),
requestHeaders: new Dictionary<string, StringValues>(),
responseHeaders: new Dictionary<string, StringValues>())
{
IsBackgroundTask = true,
};

_contextAccessor.RequestContext = fhirRequestContext;

// Check for any changes to Search Parameters
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public CosmosFhirDataStoreTests()
_cosmosDataStoreConfiguration,
Substitute.For<IOptionsMonitor<CosmosCollectionConfiguration>>(),
_cosmosQueryFactory,
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, requestContextAccessor),
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, requestContextAccessor, NullLogger<RetryExceptionPolicyFactory>.Instance),
NullLogger<CosmosFhirDataStore>.Instance,
Options.Create(new CoreFeatureConfiguration()),
_bundleOrchestrator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public FhirCosmosClientInitializerTests()
_initializer = new FhirCosmosClientInitializer(
clientTestProvider,
() => new[] { new TestRequestHandler() },
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, Substitute.For<RequestContextAccessor<IFhirRequestContext>>()),
new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, Substitute.For<RequestContextAccessor<IFhirRequestContext>>(), NullLogger<RetryExceptionPolicyFactory>.Instance),
Substitute.For<CosmosAccessTokenProviderFactory>(),
NullLogger<FhirCosmosClientInitializer>.Instance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Cosmos;
using Microsoft.Build.Framework;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Core;
using Microsoft.Health.Core.Extensions;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.CosmosDb.Core.Features.Storage;
using Microsoft.Health.Fhir.CosmosDb.Features.Queries;
using Microsoft.Health.JobManagement;
using Polly;
using Polly.Retry;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;

Expand All @@ -29,20 +35,24 @@ public class CosmosQueueClient : IQueueClient
private readonly Func<IScoped<Container>> _containerFactory;
private readonly ICosmosQueryFactory _queryFactory;
private readonly ICosmosDbDistributedLockFactory _distributedLockFactory;
private static readonly AsyncPolicy _retryPolicy = Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.PreconditionFailed)
.Or<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.Or<RequestRateExceededException>()
.WaitAndRetryAsync(5, _ => TimeSpan.FromMilliseconds(RandomNumberGenerator.GetInt32(100, 1000)));
private readonly RetryExceptionPolicyFactory _retryExceptionPolicyFactory;
private readonly ILogger<CosmosQueueClient> _logger;
private readonly AsyncPolicy _retryPolicy;

public CosmosQueueClient(
Func<IScoped<Container>> containerFactory,
ICosmosQueryFactory queryFactory,
ICosmosDbDistributedLockFactory distributedLockFactory)
ICosmosDbDistributedLockFactory distributedLockFactory,
RetryExceptionPolicyFactory retryExceptionPolicyFactor,
ILogger<CosmosQueueClient> logger)
{
_containerFactory = EnsureArg.IsNotNull(containerFactory, nameof(containerFactory));
_queryFactory = EnsureArg.IsNotNull(queryFactory, nameof(queryFactory));
_distributedLockFactory = EnsureArg.IsNotNull(distributedLockFactory, nameof(distributedLockFactory));
_retryExceptionPolicyFactory = EnsureArg.IsNotNull(retryExceptionPolicyFactor, nameof(retryExceptionPolicyFactor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));

_retryPolicy = _retryExceptionPolicyFactory.BackgroundWorkerRetryPolicy;
}

public bool IsInitialized() => true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
// -------------------------------------------------------------------------------------------------

using System;
using System.Net;
using System.Runtime.ExceptionServices;
using System.Security.Cryptography;
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Extensions;
Expand All @@ -22,23 +25,24 @@
{
private const string RetryEndTimeContextKey = "RetryEndTime";
private readonly RequestContextAccessor<IFhirRequestContext> _requestContextAccessor;
private readonly ILogger<RetryExceptionPolicyFactory> _logger;
private readonly AsyncPolicy _sdkOnlyRetryPolicy;
private readonly AsyncPolicy _bundleActionRetryPolicy;
private readonly AsyncPolicy _backgroundJobRetryPolicy;

public RetryExceptionPolicyFactory(CosmosDataStoreConfiguration configuration, RequestContextAccessor<IFhirRequestContext> requestContextAccessor)
public RetryExceptionPolicyFactory(CosmosDataStoreConfiguration configuration, RequestContextAccessor<IFhirRequestContext> requestContextAccessor, ILogger<RetryExceptionPolicyFactory> logger)
{
_requestContextAccessor = requestContextAccessor;
_requestContextAccessor = EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor));
EnsureArg.IsNotNull(configuration, nameof(configuration));
EnsureArg.IsNotNull(requestContextAccessor, nameof(requestContextAccessor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));

_sdkOnlyRetryPolicy = Policy.NoOpAsync();

_bundleActionRetryPolicy = configuration.IndividualBatchActionRetryOptions.MaxNumberOfRetries > 0
? CreateExtendedRetryPolicy(configuration.IndividualBatchActionRetryOptions.MaxNumberOfRetries / configuration.RetryOptions.MaxNumberOfRetries, configuration.IndividualBatchActionRetryOptions.MaxWaitTimeInSeconds)
: Policy.NoOpAsync();

_backgroundJobRetryPolicy = CreateExtendedRetryPolicy(100, -1);
_backgroundJobRetryPolicy = CreateExtendedRetryPolicy(100, -1, true);
}

public AsyncPolicy RetryPolicy
Expand All @@ -54,16 +58,60 @@
}
}

private static AsyncRetryPolicy CreateExtendedRetryPolicy(int maxRetries, int maxWaitTimeInSeconds)
public AsyncPolicy BackgroundWorkerRetryPolicy => _backgroundJobRetryPolicy;

private AsyncRetryPolicy CreateExtendedRetryPolicy(int maxRetries, int maxWaitTimeInSeconds, bool useExponentialRetry = false)
{
// Define a sleep duration provider based on the retry strategy
TimeSpan SleepDurationProvider(int retryAttempt, Exception exception)
{
// Respect x-ms-retry-after-ms from RequestRateExceededException
if (exception.AsRequestRateExceeded()?.RetryAfter is TimeSpan retryAfter)
{
return retryAfter;
}

// Respect x-ms-retry-after-ms from CosmosException
if (exception is CosmosException cosmosException && cosmosException.StatusCode == HttpStatusCode.TooManyRequests && cosmosException.RetryAfter.HasValue)
{
return cosmosException.RetryAfter.Value;
}

if (useExponentialRetry)
{
// Exponential backoff with jitter
var backoff = Math.Pow(2, retryAttempt) * 100; // Exponential backoff in milliseconds
var jitter = RandomNumberGenerator.GetInt32(0, 300); // Add jitter in milliseconds
return TimeSpan.FromMilliseconds(backoff + jitter);
}

// Default fixed wait time
return TimeSpan.FromSeconds(2);
}

// Retry recommendations for Cosmos DB: https://learn.microsoft.com/azure/cosmos-db/nosql/conceptual-resilient-sdk-applications#should-my-application-retry-on-errors
return Policy.Handle<RequestRateExceededException>()
.Or<CosmosException>(e => e.IsRequestRateExceeded())
.Or<CosmosException>(e => (e.StatusCode == System.Net.HttpStatusCode.ServiceUnavailable || e.StatusCode == System.Net.HttpStatusCode.RequestTimeout))
.Or<CosmosException>(e =>
e.StatusCode == System.Net.HttpStatusCode.ServiceUnavailable ||
e.StatusCode == System.Net.HttpStatusCode.TooManyRequests ||
e.StatusCode == System.Net.HttpStatusCode.Gone ||
e.StatusCode == (HttpStatusCode)449 || // "Retry with" status code
e.StatusCode == System.Net.HttpStatusCode.RequestTimeout)
.WaitAndRetryAsync(
retryCount: maxRetries,
sleepDurationProvider: (_, e, _) => e.AsRequestRateExceeded()?.RetryAfter ?? TimeSpan.FromSeconds(2),
sleepDurationProvider: (retryAttempt, exception, context) => SleepDurationProvider(retryAttempt, exception),
onRetryAsync: (e, _, _, ctx) =>
{
if (e is CosmosException cosmosException)
{
if (cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable)
{
var diagnostics = cosmosException.Diagnostics.ToString();
_logger.LogWarning(cosmosException, "Received a ServiceUnavailable response from Cosmos DB. Retrying. Diagnostics: {CosmosDiagnostics}", diagnostics);
}
}

if (maxWaitTimeInSeconds == -1)
{
// no timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ private async Task<CreateReindexResponse> SetUpForReindexing(CreateReindexReques
Options.Create(_jobConfiguration),
InitializeReindexJobTask().CreateMockScopeProvider(),
_searchParameterOperations,
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
NullLogger<ReindexJobWorker>.Instance);

await _reindexJobWorker.Handle(new SearchParametersInitializedNotification(), CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public virtual async Task InitializeAsync()

var responseProcessor = new CosmosResponseProcessor(_fhirRequestContextAccessor, mediator, Substitute.For<ICosmosQueryLogger>(), NullLogger<CosmosResponseProcessor>.Instance);
var handler = new FhirCosmosResponseHandler(() => new NonDisposingScope(_container), _cosmosDataStoreConfiguration, _fhirRequestContextAccessor, responseProcessor);
var retryExceptionPolicyFactory = new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, _fhirRequestContextAccessor);
var retryExceptionPolicyFactory = new RetryExceptionPolicyFactory(_cosmosDataStoreConfiguration, _fhirRequestContextAccessor, NullLogger<RetryExceptionPolicyFactory>.Instance);
var documentClientInitializer = new FhirCosmosClientInitializer(
testProvider,
() => new[] { handler },
Expand Down Expand Up @@ -232,7 +232,9 @@ public virtual async Task InitializeAsync()
_queueClient = new CosmosQueueClient(
() => _container.CreateMockScope(),
new CosmosQueryFactory(Substitute.For<ICosmosResponseProcessor>(), Substitute.For<ICosmosQueryLogger>()),
new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger<CosmosDbDistributedLock>.Instance));
new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger<CosmosDbDistributedLock>.Instance),
retryExceptionPolicyFactory,
NullLogger<CosmosQueueClient>.Instance);

_cosmosFhirOperationDataStore = new CosmosFhirOperationDataStore(
_queueClient,
Expand Down
Loading