Skip to content

Commit adf30d0

Browse files
authored
Use DB time for background job time limit (#5560)
1 parent 69b2e31 commit adf30d0

16 files changed

Lines changed: 151 additions & 197 deletions

File tree

src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkDelete/CreateBulkDeleteHandlerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public async Task GivenBulkDeleteRequest_WhenJobCreationRequested_ThenJobIsCreat
8181
Assert.Equal(_testUrl, definition.Url);
8282
Assert.Equal(_testUrl, definition.BaseUrl);
8383
Assert.Equal(DeleteOperation.HardDelete, definition.DeleteOperation);
84-
Assert.Equal(searchParams.Count + 1, definition.SearchParameters.Count); // Adds the max time
84+
Assert.Equal(searchParams.Count, definition.SearchParameters.Count);
8585

8686
return new List<JobInfo>()
8787
{

src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkUpdate/BulkUpdateOrchestratorJobTests.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsNotGivenOrAreAllowedAn
6767
{
6868
GroupId = 1,
6969
Definition = JsonConvert.SerializeObject(definition),
70-
CreateDate = DateTime.Now,
70+
CreateDate = DateTime.UtcNow,
7171
};
7272

7373
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -149,7 +149,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsNotGivenOrAreAllowedAn
149149
{
150150
GroupId = 1,
151151
Definition = JsonConvert.SerializeObject(definition),
152-
CreateDate = DateTime.Now,
152+
CreateDate = DateTime.UtcNow,
153153
};
154154

155155
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -199,7 +199,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsGivenAndIsParallelIsTr
199199
{
200200
GroupId = 1,
201201
Definition = JsonConvert.SerializeObject(definition),
202-
CreateDate = DateTime.Now,
202+
CreateDate = DateTime.UtcNow,
203203
};
204204

205205
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -283,7 +283,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsGivenAndIsParallelIsTr
283283
GroupId = 1,
284284
Id = 1,
285285
Definition = JsonConvert.SerializeObject(definition),
286-
CreateDate = DateTime.Now,
286+
CreateDate = DateTime.UtcNow,
287287
};
288288

289289
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -371,7 +371,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsGivenAndIsParallelIsTr
371371
GroupId = 1,
372372
Id = 1,
373373
Definition = JsonConvert.SerializeObject(definition),
374-
CreateDate = DateTime.Now,
374+
CreateDate = DateTime.UtcNow,
375375
};
376376

377377
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -417,7 +417,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsGivenAndIsParallelIsTr
417417
{
418418
GroupId = 1,
419419
Definition = JsonConvert.SerializeObject(definition),
420-
CreateDate = DateTime.Now,
420+
CreateDate = DateTime.UtcNow,
421421
};
422422

423423
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -502,7 +502,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsGivenAndIsParallelIsTr
502502
{
503503
GroupId = 1,
504504
Definition = JsonConvert.SerializeObject(definition),
505-
CreateDate = DateTime.Now,
505+
CreateDate = DateTime.UtcNow,
506506
};
507507

508508
await orchestratorJobWithTruncatedIssue.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -575,7 +575,7 @@ public async Task GivenBulkUpdateJob_WhenSearchParameterIsGivenAndIsParallelIsTr
575575
{
576576
GroupId = 1,
577577
Definition = JsonConvert.SerializeObject(definition),
578-
CreateDate = DateTime.Now,
578+
CreateDate = DateTime.UtcNow,
579579
};
580580

581581
await orchestratorJobWithTruncatedIssue.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -654,7 +654,7 @@ public async Task GivenBulkUpdateJob_WhenIsParallelIsFalseWithSearchParameter_Th
654654
{
655655
GroupId = 1,
656656
Definition = JsonConvert.SerializeObject(definition),
657-
CreateDate = DateTime.Now,
657+
CreateDate = DateTime.UtcNow,
658658
};
659659

660660
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -704,7 +704,7 @@ public async Task GivenBulkUpdateJob_WhenIsParallelIsFalseWithoutSearchParameter
704704
{
705705
GroupId = 1,
706706
Definition = JsonConvert.SerializeObject(definition),
707-
CreateDate = DateTime.Now,
707+
CreateDate = DateTime.UtcNow,
708708
};
709709

710710
await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None);
@@ -736,7 +736,7 @@ public async Task GivenBulkUpdateJob_WhenIsParallelIsTrueWithoutSearchParamAndGe
736736
{
737737
GroupId = 1,
738738
Definition = JsonConvert.SerializeObject(definition),
739-
CreateDate = DateTime.Now,
739+
CreateDate = DateTime.UtcNow,
740740
};
741741

742742
// Act
@@ -769,7 +769,7 @@ public async Task GivenBulkUpdateJob_WhenIsParallelIsTrueWithSearchParamAndSearc
769769
{
770770
GroupId = 1,
771771
Definition = JsonConvert.SerializeObject(definition),
772-
CreateDate = DateTime.Now,
772+
CreateDate = DateTime.UtcNow,
773773
};
774774

775775
// Act
@@ -797,7 +797,7 @@ public async Task GivenBulkUpdateJob_WhenQueueClientThrowsException_ThenExceptio
797797
{
798798
GroupId = 1,
799799
Definition = JsonConvert.SerializeObject(definition),
800-
CreateDate = DateTime.Now,
800+
CreateDate = DateTime.UtcNow,
801801
};
802802

803803
await Assert.ThrowsAsync<InvalidOperationException>(async () => await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None));
@@ -817,7 +817,7 @@ public async Task GivenBulkUpdateJob_WhenSearchServiceThrowsException_ThenExcept
817817
{
818818
GroupId = 1,
819819
Definition = JsonConvert.SerializeObject(definition),
820-
CreateDate = DateTime.Now,
820+
CreateDate = DateTime.UtcNow,
821821
};
822822

823823
await Assert.ThrowsAsync<InvalidOperationException>(async () => await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None));
@@ -836,7 +836,7 @@ public async Task GivenBulkUpdateJob_WhenJobDefinitionIsMalformed_ThenExceptionI
836836
{
837837
GroupId = 1,
838838
Definition = "not a valid json",
839-
CreateDate = DateTime.Now,
839+
CreateDate = DateTime.UtcNow,
840840
};
841841

842842
await Assert.ThrowsAsync<JsonReaderException>(async () => await _orchestratorJob.ExecuteAsync(jobInfo, CancellationToken.None));

src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/BulkUpdate/CreateBulkUpdateHandlerTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public async Task GivenBulkUpdateRequestWithVariousSearchParams_WhenJobCreationR
8080
var definition = JsonConvert.DeserializeObject<BulkUpdateDefinition>(args.ArgAt<string[]>(1)[0]);
8181
Assert.Equal(_testUrl, definition.Url);
8282
Assert.Equal(_testUrl, definition.BaseUrl);
83-
Assert.Equal((searchParams?.Count ?? 0) + 1, definition.SearchParameters.Count);
83+
Assert.Equal(searchParams?.Count ?? 0, definition.SearchParameters.Count);
8484
return new List<JobInfo>()
8585
{
8686
new JobInfo()
@@ -114,7 +114,7 @@ public async Task GivenBulkUpdateRequest_WhenResourceTypeIsNull_ThenJobIsCreated
114114
var definition = JsonConvert.DeserializeObject<BulkUpdateDefinition>(args.ArgAt<string[]>(1)[0]);
115115
Assert.Equal(_testUrl, definition.Url);
116116
Assert.Equal(_testUrl, definition.BaseUrl);
117-
Assert.Equal(searchParams.Count + 1, definition.SearchParameters.Count);
117+
Assert.Equal(searchParams.Count, definition.SearchParameters.Count);
118118
Assert.Null(definition.Type);
119119

120120
return new List<JobInfo>()
@@ -143,7 +143,7 @@ public async Task GivenBulkUpdateRequest_WhenSearchParamsIsNull_ThenJobIsCreated
143143
var definition = JsonConvert.DeserializeObject<BulkUpdateDefinition>(args.ArgAt<string[]>(1)[0]);
144144
Assert.Equal(_testUrl, definition.Url);
145145
Assert.Equal(_testUrl, definition.BaseUrl);
146-
Assert.Single(definition.SearchParameters); // Will have _lastUpdated parameter added by the handler
146+
Assert.Empty(definition.SearchParameters);
147147

148148
return new List<JobInfo>()
149149
{

src/Microsoft.Health.Fhir.Core.UnitTests/Features/Operations/Export/ExportJobTaskTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2291,6 +2291,7 @@ private ExportJobRecord CreateExportJobRecord(
22912291
hash,
22922292
_exportJobConfiguration.RollingFileSizeInMB,
22932293
since: since,
2294+
till: new PartialDateTime(Clock.UtcNow),
22942295
groupId: groupId,
22952296
storageAccountConnectionHash: storageAccountConnectionHash,
22962297
storageAccountUri: storageAccountUri == null ? _exportJobConfiguration.StorageAccountUri : storageAccountUri,

src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteOrchestratorJob.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
using Hl7.Fhir.Rest;
1313
using Microsoft.Health.Extensions.DependencyInjection;
1414
using Microsoft.Health.Fhir.Core.Features.Search;
15+
using Microsoft.Health.Fhir.Core.Models;
1516
using Microsoft.Health.JobManagement;
1617

1718
namespace Microsoft.Health.Fhir.Core.Features.Operations.BulkDelete
@@ -47,11 +48,11 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
4748
{
4849
IReadOnlyList<string> resourceTypes = await searchService.Value.GetUsedResourceTypes(cancellationToken);
4950

50-
processingDefinition = await CreateProcessingDefinition(definition, searchService.Value, new List<string>(resourceTypes), cancellationToken);
51+
processingDefinition = await CreateProcessingDefinition(jobInfo, definition, searchService.Value, new List<string>(resourceTypes), cancellationToken);
5152
}
5253
else
5354
{
54-
processingDefinition = await CreateProcessingDefinition(definition, searchService.Value, new List<string>() { definition.Type }, cancellationToken);
55+
processingDefinition = await CreateProcessingDefinition(jobInfo, definition, searchService.Value, new List<string>() { definition.Type }, cancellationToken);
5556
}
5657

5758
// Processing Definition can be null if bulk delete was requested on criteria that didn't match any resources. If there is nothing to delete, just finish the job.
@@ -65,21 +66,25 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
6566

6667
// Creates a bulk delete processing job.
6768
// Each processing job only deletes one resource type, but it contains a comma seperated list of all resource types to be deleted. Once one type is deleted it will start a new job to delete the next one.
68-
internal static async Task<BulkDeleteDefinition> CreateProcessingDefinition(BulkDeleteDefinition baseDefinition, ISearchService searchService, IList<string> resourceTypes, CancellationToken cancellationToken)
69+
internal static async Task<BulkDeleteDefinition> CreateProcessingDefinition(JobInfo jobInfo, BulkDeleteDefinition baseDefinition, ISearchService searchService, IList<string> resourceTypes, CancellationToken cancellationToken)
6970
{
71+
var createDate = new PartialDateTime(new DateTimeOffset(jobInfo.CreateDate, TimeSpan.Zero));
7072
var searchParameters = new List<Tuple<string, string>>()
7173
{
72-
new Tuple<string, string>(KnownQueryParameterNames.Summary, "count"),
74+
new Tuple<string, string>(KnownQueryParameterNames.LastUpdated, $"lt{createDate}"),
7375
};
7476

7577
if (baseDefinition.SearchParameters != null)
7678
{
7779
searchParameters.AddRange(baseDefinition.SearchParameters);
7880
}
7981

82+
var countSearchParameters = new List<Tuple<string, string>>(searchParameters);
83+
countSearchParameters.Add(new Tuple<string, string>(KnownQueryParameterNames.Summary, "count"));
84+
8085
while (resourceTypes.Count > 0)
8186
{
82-
int numResources = (await searchService.SearchAsync(resourceTypes[0], searchParameters, cancellationToken, resourceVersionTypes: baseDefinition.VersionType)).TotalCount.GetValueOrDefault();
87+
int numResources = (await searchService.SearchAsync(resourceTypes[0], countSearchParameters, cancellationToken, resourceVersionTypes: baseDefinition.VersionType)).TotalCount.GetValueOrDefault();
8388

8489
if (numResources == 0)
8590
{

src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/BulkDeleteProcessingJob.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
129129
{
130130
types.RemoveAt(0);
131131
using var searchService = _searchService.Invoke();
132-
BulkDeleteDefinition processingDefinition = await BulkDeleteOrchestratorJob.CreateProcessingDefinition(definition, searchService.Value, types, cancellationToken);
132+
BulkDeleteDefinition processingDefinition = await BulkDeleteOrchestratorJob.CreateProcessingDefinition(jobInfo, definition, searchService.Value, types, cancellationToken);
133133

134134
if (processingDefinition != null)
135135
{

src/Microsoft.Health.Fhir.Core/Features/Operations/BulkDelete/Handlers/CreateBulkDeleteHandler.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ public async Task<CreateBulkDeleteResponse> Handle(CreateBulkDeleteRequest reque
6262

6363
await _authorizationService.CheckAccess(requiredDataAction, true, cancellationToken);
6464

65-
var searchParameters = new List<Tuple<string, string>>(request.ConditionalParameters); // remove read only restriction
66-
var dateCurrent = new PartialDateTime(Clock.UtcNow);
67-
searchParameters.Add(Tuple.Create("_lastUpdated", $"lt{dateCurrent}"));
65+
var searchParameters = new List<Tuple<string, string>>(request.ConditionalParameters);
6866

6967
// Should not run bulk delete if any of the search parameters are invalid as it can lead to unpredicatable results
7068
await _searchService.ConditionalSearchAsync(request.ResourceType, searchParameters, cancellationToken, count: 1, logger: _logger);

src/Microsoft.Health.Fhir.Core/Features/Operations/BulkUpdate/BulkUpdateOrchestratorJob.cs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
138138
}
139139

140140
_logger.LogJobInformation(jobInfo, "Creating bulk update definition (1).");
141-
var processingDefinition = CreateProcessingDefinition(definition, searchService.Value, cancellationToken, type, continuationToken: null, startSurrogateId: range.StartId.ToString(), endSurrogateId: range.EndId.ToString(), globalStartSurrogateId: globalStartId.ToString(), globalEndSurrogateId: globalEndId.ToString());
141+
var processingDefinition = CreateProcessingDefinition(definition, jobInfo.CreateDate, type, continuationToken: null, startSurrogateId: range.StartId.ToString(), endSurrogateId: range.EndId.ToString(), globalStartSurrogateId: globalStartId.ToString(), globalEndSurrogateId: globalEndId.ToString());
142142
definitions.Add(processingDefinition);
143143
}
144144

@@ -174,6 +174,10 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
174174
SearchResult searchResult;
175175

176176
var searchParams = definition.SearchParameters?.ToList() ?? new List<Tuple<string, string>>();
177+
178+
var createDate = new PartialDateTime(new DateTimeOffset(jobInfo.CreateDate, TimeSpan.Zero));
179+
searchParams.Add(Tuple.Create(KnownQueryParameterNames.LastUpdated, $"lt{createDate}"));
180+
177181
searchParams.Add(Tuple.Create(KnownQueryParameterNames.Count, definition.MaximumNumberOfResourcesPerQuery.ToString(CultureInfo.InvariantCulture)));
178182
if (!string.IsNullOrEmpty(lastEnqueuedMaxContinuationToken))
179183
{
@@ -207,7 +211,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
207211

208212
// Enqueue the job for the current page of results
209213
_logger.LogJobInformation(jobInfo, "Creating bulk update definition (3).");
210-
var processingRecord = CreateProcessingDefinition(definition, searchService.Value, cancellationToken, definition.Type, prevContinuationToken, false);
214+
var processingRecord = CreateProcessingDefinition(definition, jobInfo.CreateDate, definition.Type, prevContinuationToken, false);
211215

212216
_logger.LogJobInformation(jobInfo, "Enqueuing bulk update job (4).");
213217
await _queueClient.EnqueueAsync(QueueType.BulkUpdate, cancellationToken, groupId: jobInfo.GroupId, definitions: processingRecord);
@@ -230,7 +234,7 @@ public async Task<string> ExecuteAsync(JobInfo jobInfo, CancellationToken cancel
230234
else if (groupJobs.Count == 1)
231235
{
232236
_logger.LogJobInformation(jobInfo, "Creating bulk update definition (5).");
233-
var processingRecord = CreateProcessingDefinition(definition, searchService.Value, cancellationToken, definition.Type, null, true);
237+
var processingRecord = CreateProcessingDefinition(definition, jobInfo.CreateDate, definition.Type, null, true);
234238
_logger.LogJobInformation(jobInfo, "Enqueuing bulk update job (5).");
235239
await _queueClient.EnqueueAsync(QueueType.BulkUpdate, cancellationToken, groupId: jobInfo.GroupId, definitions: processingRecord);
236240
}
@@ -269,22 +273,17 @@ private static async Task<SearchResult> Search(BulkUpdateDefinition definition,
269273
isIncludesOperation: false);
270274
}
271275

272-
internal static BulkUpdateDefinition CreateProcessingDefinition(BulkUpdateDefinition baseDefinition, ISearchService searchService, CancellationToken cancellationToken, string resourceType = null, string continuationToken = null, bool readNextPage = false, string startSurrogateId = null, string endSurrogateId = null, string globalStartSurrogateId = null, string globalEndSurrogateId = null)
276+
internal static BulkUpdateDefinition CreateProcessingDefinition(BulkUpdateDefinition baseDefinition, DateTime jobCreateDate, string resourceType = null, string continuationToken = null, bool readNextPage = false, string startSurrogateId = null, string endSurrogateId = null, string globalStartSurrogateId = null, string globalEndSurrogateId = null)
273277
{
274-
var searchParameters = new List<Tuple<string, string>>()
278+
var createDate = new PartialDateTime(new DateTimeOffset(jobCreateDate, TimeSpan.Zero));
279+
var cloneList = new List<Tuple<string, string>>()
275280
{
276-
new Tuple<string, string>(KnownQueryParameterNames.Summary, "count"),
281+
new Tuple<string, string>(KnownQueryParameterNames.LastUpdated, $"lt{createDate}"),
277282
};
278283

279284
if (baseDefinition.SearchParameters != null)
280285
{
281-
searchParameters.AddRange(baseDefinition.SearchParameters);
282-
}
283-
284-
var cloneList = new List<Tuple<string, string>>();
285-
if (baseDefinition.SearchParameters != null)
286-
{
287-
cloneList = baseDefinition.SearchParameters.ToList();
286+
cloneList.AddRange(baseDefinition.SearchParameters);
288287
}
289288

290289
if (!string.IsNullOrEmpty(continuationToken))

src/Microsoft.Health.Fhir.Core/Features/Operations/BulkUpdate/Handlers/CreateBulkUpdateHandler.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,6 @@ public async Task<CreateBulkUpdateResponse> Handle(CreateBulkUpdateRequest reque
8383
searchParameters.AddRange(request.ConditionalParameters);
8484
}
8585

86-
var dateCurrent = new PartialDateTime(Clock.UtcNow);
87-
searchParameters.Add(Tuple.Create("_lastUpdated", $"lt{dateCurrent}"));
88-
8986
// Remove bulk update specific parameters from search parameters
9087
searchParameters.RemoveAll(t => BulkUpdateQueryParameters.Any(param => param.Equals(t.Item1, StringComparison.OrdinalIgnoreCase)));
9188

0 commit comments

Comments
 (0)