From 02c47a5e7a46f143ce00b260baa16ecaa6cf2037 Mon Sep 17 00:00:00 2001 From: Sergey Galuzo Date: Fri, 17 Jan 2025 21:43:40 -0800 Subject: [PATCH] Single resource per blob --- .../Storage/SqlServerFhirDataStore.cs | 32 +++++++------------ .../Features/Storage/SqlStoreClient.cs | 20 ++++++------ 2 files changed, 21 insertions(+), 31 deletions(-) diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs index 0abe108f58..b9138bfc6f 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs @@ -131,12 +131,12 @@ public SqlServerFhirDataStore( internal static TimeSpan MergeResourcesTransactionHeartbeatPeriod => TimeSpan.FromSeconds(10); - private async Task DeleteBlobFromAdlsAsync(long transactionId, CancellationToken cancellationToken) + private async Task DeleteBlobFromAdlsAsync(long transactionId, int blobIndex, CancellationToken cancellationToken) { var start = DateTime.UtcNow; var sw = Stopwatch.StartNew(); var retries = 0; - var blobName = GetBlobNameForRaw(transactionId); + var blobName = GetBlobNameForRaw(transactionId, blobIndex); while (true) { try @@ -165,32 +165,24 @@ private async Task PutRawResourcesIntoAdlsAsync(IReadOnlyList { - resource.FileId = transactionId; - resource.OffsetInFile = offset; - var line = resource.ResourceWrapper.RawResource.Data; - offset += Encoding.UTF8.GetByteCount(line) + eol; - await writer.WriteLineAsync(line); - } + resources[index].FileId = transactionId; + resources[index].OffsetInFile = index; + var blobClient = SqlAdlsClient.Container.GetBlobClient(GetBlobNameForRaw(transactionId, index)); + blobClient.Upload(BinaryData.FromString(resources[index].ResourceWrapper.RawResource.Data + Environment.NewLine), overwrite: true); + }); - #pragma warning disable CA2016 - await writer.FlushAsync(); break; } catch (Exception e) { - await StoreClient.TryLogEvent("PutRawResourcesIntoAdlsAsync", "Error", $"blob={blobName} error={e}", start, cancellationToken); + await StoreClient.TryLogEvent("PutRawResourcesIntoAdlsAsync", "Error", $"transactionId={transactionId} error={e}", start, cancellationToken); if (e.ToString().Contains("ConditionNotMet", StringComparison.OrdinalIgnoreCase) && retries++ < 3) { await Task.Delay(1000, cancellationToken); @@ -202,12 +194,12 @@ private async Task PutRawResourcesIntoAdlsAsync(IReadOnlyList> ReadResourceWrappersAsync(Sql throw new InvalidOperationException("ADLS container is null."); } - var resourceRefsByFile = resourceRefs.GroupBy(_ => _.FileId); - foreach (var file in resourceRefsByFile) + Parallel.ForEach(resourceRefs, resourceRef => { - var blobName = SqlServerFhirDataStore.GetBlobNameForRaw(file.Key); + var blobName = SqlServerFhirDataStore.GetBlobNameForRaw(resourceRef.FileId, resourceRef.OffsetInFile); var blobClient = SqlAdlsClient.Container.GetBlobClient(blobName); - using var stream = blobClient.OpenRead(); - using var reader = new StreamReader(stream); - foreach (var offset in file) + var result = blobClient.Download(); + using var streamReader = new StreamReader(result.Value.Content); + var rawResource = streamReader.ReadLine(); + lock (results) { - reader.DiscardBufferedData(); - stream.Position = offset.OffsetInFile; - var line = reader.ReadLine(); - results.Add((file.Key, offset.OffsetInFile), line); + results.Add((resourceRef.FileId, resourceRef.OffsetInFile), rawResource); } - } + }); return results; }