Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved cosmos job hosting to use unified retry logic #4773

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
Expand Down Expand Up @@ -59,6 +61,7 @@ public ReindexJobWorkerTests()
Options.Create(_reindexJobConfiguration),
_reindexJobTask.CreateMockScopeProvider(),
searchParameterOperations,
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
NullLogger<ReindexJobWorker>.Instance);

_reindexJobWorker.Handle(new Messages.Search.SearchParametersInitializedNotification(), CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
using Microsoft.Health.Fhir.Core.Features.Search.Parameters;
using Microsoft.Health.Fhir.Core.Messages.Search;
Expand All @@ -29,6 +32,7 @@
private readonly ReindexJobConfiguration _reindexJobConfiguration;
private readonly IScopeProvider<IReindexJobTask> _reindexJobTaskFactory;
private readonly ISearchParameterOperations _searchParameterOperations;
private readonly RequestContextAccessor<IFhirRequestContext> _contextAccessor;
private readonly ILogger _logger;
private bool _searchParametersInitialized = false;

Expand All @@ -37,19 +41,15 @@
IOptions<ReindexJobConfiguration> reindexJobConfiguration,
IScopeProvider<IReindexJobTask> reindexJobTaskFactory,
ISearchParameterOperations searchParameterOperations,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
ILogger<ReindexJobWorker> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
EnsureArg.IsNotNull(logger, nameof(logger));

_fhirOperationDataStoreFactory = fhirOperationDataStoreFactory;
_reindexJobConfiguration = reindexJobConfiguration.Value;
_reindexJobTaskFactory = reindexJobTaskFactory;
_searchParameterOperations = searchParameterOperations;
_logger = logger;
_fhirOperationDataStoreFactory = EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
_reindexJobConfiguration = EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
_reindexJobTaskFactory = EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
_searchParameterOperations = EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
_contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));
}

public async Task ExecuteAsync(CancellationToken cancellationToken)
Expand All @@ -60,6 +60,18 @@
{
if (_searchParametersInitialized)
{
// Create a background task context to trigger the correct retry policy.
var fhirRequestContext = new FhirRequestContext(
method: nameof(ReindexJobWorker),
uriString: string.Empty,
baseUriString: string.Empty,
correlationId: string.Empty,
requestHeaders: new Dictionary<string, StringValues>(),
responseHeaders: new Dictionary<string, StringValues>())
{
IsBackgroundTask = true,
};

// Check for any changes to Search Parameters
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Cosmos;
using Microsoft.Build.Framework;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Core;
using Microsoft.Health.Core.Extensions;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.CosmosDb.Core.Features.Storage;
using Microsoft.Health.Fhir.CosmosDb.Features.Queries;
using Microsoft.Health.JobManagement;
using Polly;
using Polly.Retry;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;

Expand All @@ -29,20 +35,24 @@ public class CosmosQueueClient : IQueueClient
private readonly Func<IScoped<Container>> _containerFactory;
private readonly ICosmosQueryFactory _queryFactory;
private readonly ICosmosDbDistributedLockFactory _distributedLockFactory;
private static readonly AsyncPolicy _retryPolicy = Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.PreconditionFailed)
.Or<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.Or<RequestRateExceededException>()
.WaitAndRetryAsync(5, _ => TimeSpan.FromMilliseconds(RandomNumberGenerator.GetInt32(100, 1000)));
private readonly RetryExceptionPolicyFactory _retryExceptionPolicyFactory;
private readonly ILogger<CosmosQueueClient> _logger;
private readonly AsyncPolicy _retryPolicy;

public CosmosQueueClient(
Func<IScoped<Container>> containerFactory,
ICosmosQueryFactory queryFactory,
ICosmosDbDistributedLockFactory distributedLockFactory)
ICosmosDbDistributedLockFactory distributedLockFactory,
RetryExceptionPolicyFactory retryExceptionPolicyFactor,
ILogger<CosmosQueueClient> logger)
{
_containerFactory = EnsureArg.IsNotNull(containerFactory, nameof(containerFactory));
_queryFactory = EnsureArg.IsNotNull(queryFactory, nameof(queryFactory));
_distributedLockFactory = EnsureArg.IsNotNull(distributedLockFactory, nameof(distributedLockFactory));
_retryExceptionPolicyFactory = EnsureArg.IsNotNull(retryExceptionPolicyFactor, nameof(retryExceptionPolicyFactor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));

_retryPolicy = _retryExceptionPolicyFactory.BackgroundWorkerRetryPolicy;
}

public bool IsInitialized() => true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public AsyncPolicy RetryPolicy
}
}

public AsyncPolicy BackgroundWorkerRetryPolicy => _backgroundJobRetryPolicy;

private static AsyncRetryPolicy CreateExtendedRetryPolicy(int maxRetries, int maxWaitTimeInSeconds)
{
return Policy.Handle<RequestRateExceededException>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ private async Task<CreateReindexResponse> SetUpForReindexing(CreateReindexReques
Options.Create(_jobConfiguration),
InitializeReindexJobTask().CreateMockScopeProvider(),
_searchParameterOperations,
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
NullLogger<ReindexJobWorker>.Instance);

await _reindexJobWorker.Handle(new SearchParametersInitializedNotification(), CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ public virtual async Task InitializeAsync()
_queueClient = new CosmosQueueClient(
() => _container.CreateMockScope(),
new CosmosQueryFactory(Substitute.For<ICosmosResponseProcessor>(), Substitute.For<ICosmosQueryLogger>()),
new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger<CosmosDbDistributedLock>.Instance));
new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger<CosmosDbDistributedLock>.Instance),
retryExceptionPolicyFactory,
NullLogger<CosmosQueueClient>.Instance);

_cosmosFhirOperationDataStore = new CosmosFhirOperationDataStore(
_queueClient,
Expand Down
Loading