Skip to content

Fix $bulk-delete bugs with custom search parameter #4964

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
aa190d4
Fixing $bulk-delete bugs.
tarunmathew12 May 1, 2025
62daa03
Fixing exception types.
tarunmathew12 May 1, 2025
53acd9b
Addressing bot's comment
tarunmathew12 May 1, 2025
d4ae388
Removing the redundant grouping/summing.
tarunmathew12 May 2, 2025
21b8701
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 5, 2025
179274e
Fixing some test failures.
tarunmathew12 May 5, 2025
6917021
Adding some debug statements
tarunmathew12 May 5, 2025
4536421
Debugging flaky tests.
tarunmathew12 May 5, 2025
d4a5874
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 6, 2025
af3484b
More changes for investigation the flaky tests.
tarunmathew12 May 6, 2025
ff8250c
More change for flaky test investigation.
tarunmathew12 May 6, 2025
19439a2
Fixing the flaky tests.
tarunmathew12 May 6, 2025
8e982d9
Add more code to ensure each operation passes.
tarunmathew12 May 7, 2025
06beb09
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 8, 2025
8e54cb9
Renaming helper functions.
tarunmathew12 May 8, 2025
942c800
Investigating flaky tests.
tarunmathew12 May 8, 2025
95e9407
Adding debug statements.
tarunmathew12 May 8, 2025
3bd595d
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 12, 2025
026b92a
Investigating flaky tests.
tarunmathew12 May 12, 2025
c6067b9
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 14, 2025
5814c29
Temp change.
tarunmathew12 May 14, 2025
ce0000f
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 14, 2025
835a8af
Flaky test investigiation.
tarunmathew12 May 14, 2025
bb6dee4
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 15, 2025
cebd7cf
Flaky test investigation.
tarunmathew12 May 15, 2025
6d65aab
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 19, 2025
792169b
Adding exception handling in extracting url from sp.
tarunmathew12 May 19, 2025
27088a9
Fixing a typo.
tarunmathew12 May 19, 2025
17dc3da
Adding retry on SP update by bulk delete.
tarunmathew12 May 20, 2025
66d833e
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 21, 2025
ea18303
Adding more logging for debugging the flaky test.
tarunmathew12 May 21, 2025
220dacf
Fix delete search parameter function.
tarunmathew12 May 21, 2025
b205575
Merge branch 'main' into personal/v-tmathew/bulk-delete-bug
tarunmathew12 May 22, 2025
6c4a4c6
fixing merge conflict.
tarunmathew12 May 22, 2025
228156d
Removing the log statements for debugging.
tarunmathew12 May 22, 2025
98a8a1a
Fixing merge conflicts.
tarunmathew12 May 22, 2025
5c9c27f
Cleaning up redundant code.
tarunmathew12 May 22, 2025
59cb78a
Addressing bot comment.
tarunmathew12 May 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
using System.Threading;
using System.Threading.Tasks;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations;
using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Messages.Delete;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.Fhir.Core.UnitTests.Extensions;
using Microsoft.Health.Fhir.Tests.Common;
using Microsoft.Health.JobManagement;
Expand All @@ -43,7 +44,14 @@ public BulkDeleteProcessingJobTests()
.Returns(Task.FromResult(new SearchResult(5, new List<Tuple<string, string>>())));
_queueClient = Substitute.For<IQueueClient>();
_deleter = Substitute.For<IDeletionService>();
_processingJob = new BulkDeleteProcessingJob(_deleter.CreateMockScopeFactory(), Substitute.For<RequestContextAccessor<IFhirRequestContext>>(), Substitute.For<IMediator>(), _searchService.CreateMockScopeFactory(), _queueClient);
_processingJob = new BulkDeleteProcessingJob(
_deleter.CreateMockScopeFactory(),
Substitute.For<RequestContextAccessor<IFhirRequestContext>>(),
Substitute.For<IMediator>(),
_searchService.CreateMockScopeFactory(),
_queueClient,
Substitute.For<IModelInfoProvider>(),
Substitute.For<ILogger<BulkDeleteProcessingJob>>());
}

[Fact]
Expand All @@ -58,11 +66,8 @@ public async Task GivenProcessingJob_WhenJobIsRun_ThenResourcesAreDeleted()
Definition = JsonConvert.SerializeObject(definition),
};

var substituteResults = new Dictionary<string, long>();
substituteResults.Add("Patient", 3);

_deleter.DeleteMultipleAsync(Arg.Any<ConditionalDeleteResourceRequest>(), Arg.Any<CancellationToken>())
.Returns(args => substituteResults);
.Returns(args => CreateResources(KnownResourceTypes.Patient, 3));

var result = JsonConvert.DeserializeObject<BulkDeleteResult>(await _processingJob.ExecuteAsync(jobInfo, CancellationToken.None));
Assert.Single(result.ResourcesDeleted);
Expand All @@ -83,11 +88,8 @@ public async Task GivenProcessingJob_WhenJobIsRunWithMultipleResourceTypes_ThenF
Definition = JsonConvert.SerializeObject(definition),
};

var substituteResults = new Dictionary<string, long>();
substituteResults.Add("Patient", 3);

_deleter.DeleteMultipleAsync(Arg.Any<ConditionalDeleteResourceRequest>(), Arg.Any<CancellationToken>())
.Returns(args => substituteResults);
.Returns(args => CreateResources(KnownResourceTypes.Patient, 3));

var result = JsonConvert.DeserializeObject<BulkDeleteResult>(await _processingJob.ExecuteAsync(jobInfo, CancellationToken.None));
Assert.Single(result.ResourcesDeleted);
Expand All @@ -104,5 +106,27 @@ public async Task GivenProcessingJob_WhenJobIsRunWithMultipleResourceTypes_ThenF
var actualDefinition = JsonConvert.DeserializeObject<BulkDeleteDefinition>(definitions[0]);
Assert.Equal(2, actualDefinition.Type.SplitByOrSeparator().Count());
}

private List<ResourceWrapper> CreateResources(string resourceType, int count)
{
var resources = new List<ResourceWrapper>();
while (count-- > 0)
{
resources.Add(
new ResourceWrapper(
Guid.NewGuid().ToString(),
Guid.NewGuid().ToString(),
resourceType,
null,
null,
DateTimeOffset.UtcNow,
false,
null,
null,
null));
}

return resources;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,15 @@

// find all derived resources from the list of base resources
var allResourceTypes = GetDerivedResourceTypes(searchParameterInfo.BaseResourceTypes);
foreach (var resourceType in allResourceTypes)
{
TypeLookup[resourceType].TryRemove(searchParameterInfo.Code, out var removedParam);
if (removedParam.Url != searchParameterInfo.Url)
if (TypeLookup[resourceType].TryRemove(searchParameterInfo.Code, out var removedParam))
{
_logger.LogError("Error, Search Param {RemovedParam} removed from Search Param Definition manager. It does not match deleted Search Param {Url}", removedParam.Url, url);
if (removedParam?.Url != searchParameterInfo.Url)
{
_logger.LogError("Error, Search Param {RemovedParam} removed from Search Param Definition manager. It does not match deleted Search Param {Url}", removedParam?.Url, url);
}
}

Check notice

Code scanning / CodeQL

Nested 'if' statements can be combined Note

These 'if' statements can be combined.
}

Check notice

Code scanning / CodeQL

Missed opportunity to use Where Note

This foreach loop
implicitly filters its target sequence
- consider filtering the sequence explicitly using '.Where(...)'.

if (calculateHash)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
using System.Threading.Tasks;
using EnsureThat;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Microsoft.Health.Core.Features.Context;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Exceptions;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Context;
using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete.Messages;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Microsoft.Health.Fhir.Core.Messages.Delete;
using Microsoft.Health.Fhir.Core.Models;
using Microsoft.Health.JobManagement;
using Newtonsoft.Json;

Expand All @@ -33,19 +36,25 @@
private readonly IMediator _mediator;
private readonly Func<IScoped<ISearchService>> _searchService;
private readonly IQueueClient _queueClient;
private readonly IModelInfoProvider _modelInfoProvider;
private readonly ILogger<BulkDeleteProcessingJob> _logger;

public BulkDeleteProcessingJob(
Func<IScoped<IDeletionService>> deleterFactory,
RequestContextAccessor<IFhirRequestContext> contextAccessor,
IMediator mediator,
Func<IScoped<ISearchService>> searchService,
IQueueClient queueClient)
IQueueClient queueClient,
IModelInfoProvider modelInfoProvider,
ILogger<BulkDeleteProcessingJob> logger)
{
_deleterFactory = EnsureArg.IsNotNull(deleterFactory, nameof(deleterFactory));
_contextAccessor = EnsureArg.IsNotNull(contextAccessor, nameof(contextAccessor));
_mediator = EnsureArg.IsNotNull(mediator, nameof(mediator));
_searchService = EnsureArg.IsNotNull(searchService, nameof(searchService));
_queueClient = EnsureArg.IsNotNull(queueClient, nameof(queueClient));
_modelInfoProvider = EnsureArg.IsNotNull(modelInfoProvider, nameof(modelInfoProvider));
_logger = EnsureArg.IsNotNull(logger, nameof(logger));
}

public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancellationToken)
Expand Down Expand Up @@ -73,14 +82,14 @@

_contextAccessor.RequestContext = fhirRequestContext;
var result = new BulkDeleteResult();
IDictionary<string, long> resourcesDeleted = new Dictionary<string, long>();
var resourcesDeleted = new List<ResourceWrapper>();
using IScoped<IDeletionService> deleter = _deleterFactory.Invoke();
Exception exception = null;
List<string> types = definition.Type.SplitByOrSeparator().ToList();

try
{
resourcesDeleted = await deleter.Value.DeleteMultipleAsync(
var deleteResult = await deleter.Value.DeleteMultipleAsync(
new ConditionalDeleteResourceRequest(
types[0],
(IReadOnlyList<Tuple<string, string>>)definition.SearchParameters,
Expand All @@ -90,23 +99,44 @@
versionType: definition.VersionType,
allowPartialSuccess: false), // Explicitly setting to call out that this can be changed in the future if we want to. Bulk delete offers the possibility of automatically rerunning the operation until it succeeds, fully automating the process.
cancellationToken);
resourcesDeleted.AddRange(deleteResult);
}
catch (IncompleteOperationException<IDictionary<string, long>> ex)
catch (IncompleteOperationException<List<ResourceWrapper>> ex)
{
resourcesDeleted = ex.PartialResults;
_logger.LogError(ex, "Deleting resources failed.");
resourcesDeleted.AddRange(ex.PartialResults);
result.Issues.Add(ex.Message);
exception = ex;
}
catch (Exception ex)
{
_logger.LogError(ex, "Deleting resources failed.");
throw;
}

foreach (var (key, value) in resourcesDeleted)
var deletedResourceCountMap = resourcesDeleted.GroupBy(x => x.ResourceTypeName).ToDictionary(x => x.Key, x => (long)x.Count());
foreach (var item in deletedResourceCountMap)
{
if (!result.ResourcesDeleted.TryAdd(key, value))
if (!result.ResourcesDeleted.TryAdd(item.Key, item.Value))
{
result.ResourcesDeleted[key] += value;
result.ResourcesDeleted[item.Key] += item.Value;
}
}

Check notice

Code scanning / CodeQL

Missed opportunity to use Where Note

This foreach loop
implicitly filters its target sequence
- consider filtering the sequence explicitly using '.Where(...)'.

await _mediator.Publish(new BulkDeleteMetricsNotification(jobInfo.Id, resourcesDeleted.Sum(resource => resource.Value)), cancellationToken);
try
{
var notification = new BulkDeleteMetricsNotification(jobInfo.Id, resourcesDeleted.Count)
{
Content = CreateNotificationContent(resourcesDeleted),
};

await _mediator.Publish(notification, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to create and publish the notification.");
throw;
}

if (exception != null)
{
Expand All @@ -132,5 +162,40 @@
_contextAccessor.RequestContext = existingFhirRequestContext;
}
}

private string CreateNotificationContent(List<ResourceWrapper> resources)
{
try
{
var searchParameterUrls = resources
.Where(x => string.Equals(x.ResourceTypeName, KnownResourceTypes.SearchParameter, StringComparison.OrdinalIgnoreCase) && x.RawResource != null)
.Select(x =>
{
try
{
return _modelInfoProvider.ToTypedElement(x.RawResource).GetStringScalar("url");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to extract the url from the resource, '{x.ResourceId}'.");
return null;
}
})
.Where(x => x != null)
.ToList();
_logger.LogInformation($"Creating the notification content with {searchParameterUrls.Count} search parameters.");
if (searchParameterUrls.Any())
{
return JsonConvert.SerializeObject(searchParameterUrls);
}

return null;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Failed to create the notification content for {resources.Count} search parameters.");
throw;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,11 @@ public BulkDeleteMetricsNotification(long id, long resourcesDeleted)
public long JobId { get; }

public OperationStatus Status { get; }

/// <summary>
/// Gets or sets a content associated with this notification. (e.g. A list of deleted search parameter urls.)
/// TODO: this should probably be Stream but MediatR doesn't look like cloning a notification before it sends to handlers.
/// </summary>
public object Content { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ public interface IDeletionService
{
public Task<ResourceKey> DeleteAsync(DeleteResourceRequest request, CancellationToken cancellationToken);

public Task<IDictionary<string, long>> DeleteMultipleAsync(ConditionalDeleteResourceRequest request, CancellationToken cancellationToken);
public Task<List<ResourceWrapper>> DeleteMultipleAsync(ConditionalDeleteResourceRequest request, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,30 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using EnsureThat;
using Hl7.Fhir.ElementModel;
using Hl7.Fhir.Rest;
using MediatR;
using Microsoft.Extensions.Logging;
using Microsoft.Health.Extensions.DependencyInjection;
using Microsoft.Health.Fhir.Core.Exceptions;
using Microsoft.Health.Fhir.Core.Extensions;
using Microsoft.Health.Fhir.Core.Features.Definition;
using Microsoft.Health.Fhir.Core.Features.Definition.BundleWrappers;
using Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete.Messages;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search.Registry;
using Microsoft.Health.Fhir.Core.Models;

namespace Microsoft.Health.Fhir.Core.Features.Search.Parameters
{
public class SearchParameterOperations : ISearchParameterOperations
public class SearchParameterOperations : ISearchParameterOperations, INotificationHandler<BulkDeleteMetricsNotification>
{
private const int DefaultDeleteTasksPerPage = 5;

private readonly ISearchParameterDefinitionManager _searchParameterDefinitionManager;
private readonly SearchParameterStatusManager _searchParameterStatusManager;
private readonly IModelInfoProvider _modelInfoProvider;
Expand Down Expand Up @@ -283,6 +289,35 @@
cancellationToken);
}

public async Task Handle(BulkDeleteMetricsNotification notification, CancellationToken cancellationToken)
{
var content = notification?.Content?.ToString();
if (!string.IsNullOrWhiteSpace(content))
{
try
{
var urls = JsonSerializer.Deserialize<List<string>>(content);
if (urls.Any(x => !string.IsNullOrWhiteSpace(x)))
{
await DeleteSearchParametersAsync(urls, DefaultDeleteTasksPerPage, cancellationToken);
}
else
{
_logger.LogInformation("Ignoring the notification as its content doesn't have any search parameter Uri.");
}
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialize the notification content.");
throw;
}
}
else
{
_logger.LogInformation("Ignoring the notification as its content is empty.");
}
}

private void DeleteSearchParameter(string url)
{
try
Expand Down Expand Up @@ -316,5 +351,51 @@

return null;
}

private async Task DeleteSearchParametersAsync(List<string> urls, int pageSize, CancellationToken cancellationToken)
{
if (urls?.Any() ?? false)
{
var urlMap = urls.ToHashSet(StringComparer.OrdinalIgnoreCase);
var allSearchParameterStatuses = await _searchParameterStatusManager.GetAllSearchParameterStatus(cancellationToken);
var searchParameterStatusesToUpdate = allSearchParameterStatuses
.Where(x => urlMap.TryGetValue(x.Uri.OriginalString, out _) && (x.Status != SearchParameterStatus.PendingDelete && x.Status != SearchParameterStatus.Deleted))
.ToList();
_logger.LogInformation($"Updating the status of {searchParameterStatusesToUpdate.Count} search parameters (out of {urls.Count} urls) to PendingDelete...");
var count = 0;
while (count < searchParameterStatusesToUpdate.Count)
{
var statusesInPage = searchParameterStatusesToUpdate.Skip(count).Take(pageSize).ToList();
count += statusesInPage.Count;

var urlsString = string.Join(Environment.NewLine, statusesInPage.Select(x => x.Uri.OriginalString));
try
{
statusesInPage.ForEach(
x =>
{
x.Status = SearchParameterStatus.PendingDelete;
x.LastUpdated = Clock.UtcNow;
});

_logger.LogInformation($"Updating the status of search parameters:{Environment.NewLine}{urlsString}");
await _searchParameterStatusManager.UpdateSearchParameterStatusAsync(statusesInPage, cancellationToken);

statusesInPage.ForEach(
x =>
{
_searchParameterDefinitionManager.UpdateSearchParameterStatus(
x.Uri.OriginalString,
SearchParameterStatus.PendingDelete);
});
}
catch (Exception ex)
{
// Note: ignore the exception and continue updating the rest.
_logger.LogError(ex, $"Failed to update the status of search parameters:{Environment.NewLine}{urlsString}");
}
}
}
}
}
}
Loading
Loading