Skip to content

Commit

Permalink
Moved cosmos job hosting to use unified retry logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mikaelweave committed Jan 14, 2025
1 parent bc6b453 commit 141246f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
Expand Down Expand Up @@ -59,6 +61,7 @@ public ReindexJobWorkerTests()
Options.Create(_reindexJobConfiguration),
_reindexJobTask.CreateMockScopeProvider(),
searchParameterOperations,
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
NullLogger<ReindexJobWorker>.Instance);

_reindexJobWorker.Handle(new Messages.Search.SearchParametersInitializedNotification(), CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Configs;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models;
using Microsoft.Health.Fhir.Core.Features.Search.Parameters;
using Microsoft.Health.Fhir.Core.Messages.Search;
Expand All @@ -29,6 +32,7 @@ public class ReindexJobWorker : INotificationHandler<SearchParametersInitialized
private readonly ReindexJobConfiguration _reindexJobConfiguration;
private readonly IScopeProvider<IReindexJobTask> _reindexJobTaskFactory;
private readonly ISearchParameterOperations _searchParameterOperations;
private readonly RequestContextAccessor<IFhirRequestContext> _contextAccessor;
private readonly ILogger _logger;
private bool _searchParametersInitialized = false;

Expand All @@ -37,19 +41,15 @@ public ReindexJobWorker(
IOptions<ReindexJobConfiguration> reindexJobConfiguration,
IScopeProvider<IReindexJobTask> reindexJobTaskFactory,
ISearchParameterOperations searchParameterOperations,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
ILogger<ReindexJobWorker> logger)
{
EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
EnsureArg.IsNotNull(logger, nameof(logger));

_fhirOperationDataStoreFactory = fhirOperationDataStoreFactory;
_reindexJobConfiguration = reindexJobConfiguration.Value;
_reindexJobTaskFactory = reindexJobTaskFactory;
_searchParameterOperations = searchParameterOperations;
_logger = logger;
_fhirOperationDataStoreFactory = EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory));
_reindexJobConfiguration = EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration));
_reindexJobTaskFactory = EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory));
_searchParameterOperations = EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations));
_contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));
}

public async Task ExecuteAsync(CancellationToken cancellationToken)
Expand All @@ -60,6 +60,18 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
{
if (_searchParametersInitialized)
{
// Create a background task context to trigger the correct retry policy.
var fhirRequestContext = new FhirRequestContext(
method: nameof(ReindexJobWorker),
uriString: string.Empty,
baseUriString: string.Empty,
correlationId: string.Empty,
requestHeaders: new Dictionary<string, StringValues>(),
responseHeaders: new Dictionary<string, StringValues>())
{
IsBackgroundTask = true,
};

Check warning

Code scanning / CodeQL

Useless assignment to local variable Warning

This assignment to
fhirRequestContext
is useless, since its value is never read.

// Check for any changes to Search Parameters
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
using System.Threading.Tasks;
using EnsureThat;
using Microsoft.Azure.Cosmos;
using Microsoft.Build.Framework;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Abstractions.Exceptions;
using Microsoft.Health.Core;
using Microsoft.Health.Core.Extensions;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.CosmosDb.Core.Features.Storage;
using Microsoft.Health.Fhir.CosmosDb.Features.Queries;
using Microsoft.Health.JobManagement;
using Polly;
using Polly.Retry;

namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues;

Expand All @@ -29,20 +35,24 @@ public class CosmosQueueClient : IQueueClient
private readonly Func<IScoped<Container>> _containerFactory;
private readonly ICosmosQueryFactory _queryFactory;
private readonly ICosmosDbDistributedLockFactory _distributedLockFactory;
private static readonly AsyncPolicy _retryPolicy = Policy
.Handle<CosmosException>(ex => ex.StatusCode == HttpStatusCode.PreconditionFailed)
.Or<CosmosException>(ex => ex.StatusCode == HttpStatusCode.TooManyRequests)
.Or<RequestRateExceededException>()
.WaitAndRetryAsync(5, _ => TimeSpan.FromMilliseconds(RandomNumberGenerator.GetInt32(100, 1000)));
private readonly RetryExceptionPolicyFactory _retryExceptionPolicyFactory;
private readonly ILogger<CosmosQueueClient> _logger;
private readonly AsyncPolicy _retryPolicy;

public CosmosQueueClient(
Func<IScoped<Container>> containerFactory,
ICosmosQueryFactory queryFactory,
ICosmosDbDistributedLockFactory distributedLockFactory)
ICosmosDbDistributedLockFactory distributedLockFactory,
RetryExceptionPolicyFactory retryExceptionPolicyFactor,
ILogger<CosmosQueueClient> logger)
{
_containerFactory = EnsureArg.IsNotNull(containerFactory, nameof(containerFactory));
_queryFactory = EnsureArg.IsNotNull(queryFactory, nameof(queryFactory));
_distributedLockFactory = EnsureArg.IsNotNull(distributedLockFactory, nameof(distributedLockFactory));
_retryExceptionPolicyFactory = EnsureArg.IsNotNull(retryExceptionPolicyFactor, nameof(retryExceptionPolicyFactor));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));

_retryPolicy = _retryExceptionPolicyFactory.BackgroundWorkerRetryPolicy;
}

public bool IsInitialized() => true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public AsyncPolicy RetryPolicy
}
}

public AsyncPolicy BackgroundWorkerRetryPolicy => _backgroundJobRetryPolicy;

private static AsyncRetryPolicy CreateExtendedRetryPolicy(int maxRetries, int maxWaitTimeInSeconds)
{
return Policy.Handle<RequestRateExceededException>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ private async Task<CreateReindexResponse> SetUpForReindexing(CreateReindexReques
Options.Create(_jobConfiguration),
InitializeReindexJobTask().CreateMockScopeProvider(),
_searchParameterOperations,
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
NullLogger<ReindexJobWorker>.Instance);

await _reindexJobWorker.Handle(new SearchParametersInitializedNotification(), CancellationToken.None);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ public virtual async Task InitializeAsync()
_queueClient = new CosmosQueueClient(
() => _container.CreateMockScope(),
new CosmosQueryFactory(Substitute.For<ICosmosResponseProcessor>(), Substitute.For<ICosmosQueryLogger>()),
new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger<CosmosDbDistributedLock>.Instance));
new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger<CosmosDbDistributedLock>.Instance),
retryExceptionPolicyFactory,
NullLogger<CosmosQueueClient>.Instance);

_cosmosFhirOperationDataStore = new CosmosFhirOperationDataStore(
_queueClient,
Expand Down

0 comments on commit 141246f

Please sign in to comment.