diff --git a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Reindex/ReindexJobWorkerTests.cs b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Reindex/ReindexJobWorkerTests.cs index 8a79d4f5bf..ec6d4649c1 100644 --- a/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Reindex/ReindexJobWorkerTests.cs +++ b/src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Reindex/ReindexJobWorkerTests.cs @@ -9,8 +9,10 @@ using System.Threading.Tasks; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; +using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Configs; +using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.Core.Features.Operations; using Microsoft.Health.Fhir.Core.Features.Operations.Reindex; using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models; @@ -59,6 +61,7 @@ public ReindexJobWorkerTests() Options.Create(_reindexJobConfiguration), _reindexJobTask.CreateMockScopeProvider(), searchParameterOperations, + Substitute.For>(), NullLogger.Instance); _reindexJobWorker.Handle(new Messages.Search.SearchParametersInitializedNotification(), CancellationToken.None); diff --git a/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobWorker.cs b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobWorker.cs index 9f46990f61..c288d3e3a5 100644 --- a/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobWorker.cs +++ b/src/Microsoft.Health.Fhir.Core/Features/Operations/Reindex/ReindexJobWorker.cs @@ -12,8 +12,11 @@ using MediatR; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; +using Microsoft.Extensions.Primitives; +using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Configs; +using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.Core.Features.Operations.Reindex.Models; using Microsoft.Health.Fhir.Core.Features.Search.Parameters; using Microsoft.Health.Fhir.Core.Messages.Search; @@ -29,6 +32,7 @@ public class ReindexJobWorker : INotificationHandler _reindexJobTaskFactory; private readonly ISearchParameterOperations _searchParameterOperations; + private readonly RequestContextAccessor _contextAccessor; private readonly ILogger _logger; private bool _searchParametersInitialized = false; @@ -37,19 +41,15 @@ public ReindexJobWorker( IOptions reindexJobConfiguration, IScopeProvider reindexJobTaskFactory, ISearchParameterOperations searchParameterOperations, + RequestContextAccessor contextAccessor, ILogger logger) { - EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory)); - EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration)); - EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory)); - EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations)); - EnsureArg.IsNotNull(logger, nameof(logger)); - - _fhirOperationDataStoreFactory = fhirOperationDataStoreFactory; - _reindexJobConfiguration = reindexJobConfiguration.Value; - _reindexJobTaskFactory = reindexJobTaskFactory; - _searchParameterOperations = searchParameterOperations; - _logger = logger; + _fhirOperationDataStoreFactory = EnsureArg.IsNotNull(fhirOperationDataStoreFactory, nameof(fhirOperationDataStoreFactory)); + _reindexJobConfiguration = EnsureArg.IsNotNull(reindexJobConfiguration?.Value, nameof(reindexJobConfiguration)); + _reindexJobTaskFactory = EnsureArg.IsNotNull(reindexJobTaskFactory, nameof(reindexJobTaskFactory)); + _searchParameterOperations = EnsureArg.IsNotNull(searchParameterOperations, nameof(searchParameterOperations)); + _contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor)); + _logger = EnsureArg.IsNotNull(logger, nameof(logger)); } public async Task ExecuteAsync(CancellationToken cancellationToken) @@ -60,6 +60,18 @@ public async Task ExecuteAsync(CancellationToken cancellationToken) { if (_searchParametersInitialized) { + // Create a background task context to trigger the correct retry policy. + var fhirRequestContext = new FhirRequestContext( + method: nameof(ReindexJobWorker), + uriString: string.Empty, + baseUriString: string.Empty, + correlationId: string.Empty, + requestHeaders: new Dictionary(), + responseHeaders: new Dictionary()) + { + IsBackgroundTask = true, + }; + // Check for any changes to Search Parameters try { diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs index 379b8168a0..6bd358c490 100644 --- a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/Queues/CosmosQueueClient.cs @@ -12,15 +12,21 @@ using System.Threading.Tasks; using EnsureThat; using Microsoft.Azure.Cosmos; +using Microsoft.Build.Framework; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Primitives; using Microsoft.Health.Abstractions.Exceptions; using Microsoft.Health.Core; using Microsoft.Health.Core.Extensions; +using Microsoft.Health.Core.Features.Context; using Microsoft.Health.Extensions.DependencyInjection; using Microsoft.Health.Fhir.Core.Extensions; +using Microsoft.Health.Fhir.Core.Features.Context; using Microsoft.Health.Fhir.CosmosDb.Core.Features.Storage; using Microsoft.Health.Fhir.CosmosDb.Features.Queries; using Microsoft.Health.JobManagement; using Polly; +using Polly.Retry; namespace Microsoft.Health.Fhir.CosmosDb.Features.Storage.Queues; @@ -29,20 +35,24 @@ public class CosmosQueueClient : IQueueClient private readonly Func> _containerFactory; private readonly ICosmosQueryFactory _queryFactory; private readonly ICosmosDbDistributedLockFactory _distributedLockFactory; - private static readonly AsyncPolicy _retryPolicy = Policy - .Handle(ex => ex.StatusCode == HttpStatusCode.PreconditionFailed) - .Or(ex => ex.StatusCode == HttpStatusCode.TooManyRequests) - .Or() - .WaitAndRetryAsync(5, _ => TimeSpan.FromMilliseconds(RandomNumberGenerator.GetInt32(100, 1000))); + private readonly RetryExceptionPolicyFactory _retryExceptionPolicyFactory; + private readonly ILogger _logger; + private readonly AsyncPolicy _retryPolicy; public CosmosQueueClient( Func> containerFactory, ICosmosQueryFactory queryFactory, - ICosmosDbDistributedLockFactory distributedLockFactory) + ICosmosDbDistributedLockFactory distributedLockFactory, + RetryExceptionPolicyFactory retryExceptionPolicyFactor, + ILogger logger) { _containerFactory = EnsureArg.IsNotNull(containerFactory, nameof(containerFactory)); _queryFactory = EnsureArg.IsNotNull(queryFactory, nameof(queryFactory)); _distributedLockFactory = EnsureArg.IsNotNull(distributedLockFactory, nameof(distributedLockFactory)); + _retryExceptionPolicyFactory = EnsureArg.IsNotNull(retryExceptionPolicyFactor, nameof(retryExceptionPolicyFactor)); + _logger = EnsureArg.IsNotNull(logger, nameof(logger)); + + _retryPolicy = _retryExceptionPolicyFactory.BackgroundWorkerRetryPolicy; } public bool IsInitialized() => true; diff --git a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/RetryExceptionPolicyFactory.cs b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/RetryExceptionPolicyFactory.cs index 2850384550..f9f93e3738 100644 --- a/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/RetryExceptionPolicyFactory.cs +++ b/src/Microsoft.Health.Fhir.CosmosDb/Features/Storage/RetryExceptionPolicyFactory.cs @@ -54,6 +54,8 @@ public AsyncPolicy RetryPolicy } } + public AsyncPolicy BackgroundWorkerRetryPolicy => _backgroundJobRetryPolicy; + private static AsyncRetryPolicy CreateExtendedRetryPolicy(int maxRetries, int maxWaitTimeInSeconds) { return Policy.Handle() diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs index 7e710831cf..e7f150caec 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Features/Operations/Reindex/ReindexJobTests.cs @@ -899,6 +899,7 @@ private async Task SetUpForReindexing(CreateReindexReques Options.Create(_jobConfiguration), InitializeReindexJobTask().CreateMockScopeProvider(), _searchParameterOperations, + Substitute.For>(), NullLogger.Instance); await _reindexJobWorker.Handle(new SearchParametersInitializedNotification(), CancellationToken.None); diff --git a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs index 8cd2204e52..d264ab0ee9 100644 --- a/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs +++ b/test/Microsoft.Health.Fhir.Shared.Tests.Integration/Persistence/CosmosDbFhirStorageTestsFixture.cs @@ -232,7 +232,9 @@ public virtual async Task InitializeAsync() _queueClient = new CosmosQueueClient( () => _container.CreateMockScope(), new CosmosQueryFactory(Substitute.For(), Substitute.For()), - new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger.Instance)); + new CosmosDbDistributedLockFactory(() => _container.CreateMockScope(), NullLogger.Instance), + retryExceptionPolicyFactory, + NullLogger.Instance); _cosmosFhirOperationDataStore = new CosmosFhirOperationDataStore( _queueClient,