Skip to content

Commit

Permalink
Single resource per blob
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeyGaluzo committed Jan 18, 2025
1 parent 9e1d8a6 commit 02c47a5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ public SqlServerFhirDataStore(

internal static TimeSpan MergeResourcesTransactionHeartbeatPeriod => TimeSpan.FromSeconds(10);

private async Task DeleteBlobFromAdlsAsync(long transactionId, CancellationToken cancellationToken)
private async Task DeleteBlobFromAdlsAsync(long transactionId, int blobIndex, CancellationToken cancellationToken)
{
var start = DateTime.UtcNow;
var sw = Stopwatch.StartNew();
var retries = 0;
var blobName = GetBlobNameForRaw(transactionId);
var blobName = GetBlobNameForRaw(transactionId, blobIndex);
while (true)
{
try
Expand Down Expand Up @@ -165,32 +165,24 @@ private async Task PutRawResourcesIntoAdlsAsync(IReadOnlyList<MergeResourceWrapp
{
var start = DateTime.UtcNow;
var sw = Stopwatch.StartNew();
var eol = Encoding.UTF8.GetByteCount(Environment.NewLine);
var retries = 0;
var blobName = GetBlobNameForRaw(transactionId);
while (true)
{
try
{
using var stream = await SqlAdlsClient.Container.GetBlockBlobClient(blobName).OpenWriteAsync(true, null, cancellationToken);
using var writer = new StreamWriter(stream);
var offset = 0;
foreach (var resource in resources)
Parallel.For(0, resources.Count, index =>
{
resource.FileId = transactionId;
resource.OffsetInFile = offset;
var line = resource.ResourceWrapper.RawResource.Data;
offset += Encoding.UTF8.GetByteCount(line) + eol;
await writer.WriteLineAsync(line);
}
resources[index].FileId = transactionId;
resources[index].OffsetInFile = index;
var blobClient = SqlAdlsClient.Container.GetBlobClient(GetBlobNameForRaw(transactionId, index));
blobClient.Upload(BinaryData.FromString(resources[index].ResourceWrapper.RawResource.Data + Environment.NewLine), overwrite: true);
});

#pragma warning disable CA2016
await writer.FlushAsync();
break;
}
catch (Exception e)
{
await StoreClient.TryLogEvent("PutRawResourcesIntoAdlsAsync", "Error", $"blob={blobName} error={e}", start, cancellationToken);
await StoreClient.TryLogEvent("PutRawResourcesIntoAdlsAsync", "Error", $"transactionId={transactionId} error={e}", start, cancellationToken);
if (e.ToString().Contains("ConditionNotMet", StringComparison.OrdinalIgnoreCase) && retries++ < 3)
{
await Task.Delay(1000, cancellationToken);
Expand All @@ -202,12 +194,12 @@ private async Task PutRawResourcesIntoAdlsAsync(IReadOnlyList<MergeResourceWrapp
}

var mcsec = (long)Math.Round(sw.Elapsed.TotalMilliseconds * 1000, 0);
await StoreClient.TryLogEvent("PutRawResourcesToAdls", "Warn", $"mcsec={mcsec} resources={resources.Count} blob={blobName}", start, cancellationToken);
await StoreClient.TryLogEvent("PutRawResourcesToAdls", "Warn", $"mcsec={mcsec} resources={resources.Count} transactionId ={transactionId}", start, cancellationToken);
}

internal static string GetBlobNameForRaw(long fileId)
internal static string GetBlobNameForRaw(long fileId, int blobIndex)
{
return $"hash-{GetPermanentHashCode(fileId)}/transaction-{fileId}.ndjson";
return $"hash-{GetPermanentHashCode(fileId)}/transaction-{fileId}-index-{blobIndex}.ndjson";
}

private static string GetPermanentHashCode(long tr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Globalization;
using System.IO;
using System.Linq;
using System.Resources;
using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
Expand Down Expand Up @@ -119,21 +120,18 @@ private async Task<IReadOnlyList<ResourceWrapper>> ReadResourceWrappersAsync(Sql
throw new InvalidOperationException("ADLS container is null.");
}

var resourceRefsByFile = resourceRefs.GroupBy(_ => _.FileId);
foreach (var file in resourceRefsByFile)
Parallel.ForEach(resourceRefs, resourceRef =>
{
var blobName = SqlServerFhirDataStore.GetBlobNameForRaw(file.Key);
var blobName = SqlServerFhirDataStore.GetBlobNameForRaw(resourceRef.FileId, resourceRef.OffsetInFile);
var blobClient = SqlAdlsClient.Container.GetBlobClient(blobName);
using var stream = blobClient.OpenRead();
using var reader = new StreamReader(stream);
foreach (var offset in file)
var result = blobClient.Download();
using var streamReader = new StreamReader(result.Value.Content);
var rawResource = streamReader.ReadLine();
lock (results)
{
reader.DiscardBufferedData();
stream.Position = offset.OffsetInFile;
var line = reader.ReadLine();
results.Add((file.Key, offset.OffsetInFile), line);
results.Add((resourceRef.FileId, resourceRef.OffsetInFile), rawResource);
}
}
});

return results;
}
Expand Down

0 comments on commit 02c47a5

Please sign in to comment.