Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 65 additions & 40 deletions src/OrganisationRegistry.ElasticSearch.Projections/BaseRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace OrganisationRegistry.ElasticSearch.Projections;
using System.Threading.Tasks;
using Client;
using Configuration;
using ElasticSearch.Bodies;
using ElasticSearch.Organisations;
using Infrastructure;
using Infrastructure.Change;
using Microsoft.Extensions.Logging;
Expand All @@ -19,59 +21,57 @@ namespace OrganisationRegistry.ElasticSearch.Projections;
using SqlServer.ElasticSearchProjections;
using SqlServer.ProjectionState;

public record ProjectionName(string ElasticSearchProjectionsProjectionName, string FullName, string Name);
public abstract class BaseRunner<T> where T: class, IDocument, new()
{
public string ProjectionName { get; }
public string ProjectionName
=> _projectionName.Name;

public Type[] EventHandlers { get; }

private readonly string _elasticSearchProjectionsProjectionName;
private readonly string _projectionFullName;
private readonly Elastic _elastic;

private readonly int _batchSize;
private readonly ILogger<BaseRunner<T>> _logger;
private readonly IEventStore _store;
private readonly IProjectionStates _projectionStates;
private readonly ElasticBus _bus;
private readonly IContextFactory _contextFactory;
public IContextFactory ContextFactory { get; }
private readonly OpenTelemetryMetrics.ElasticSearchProjections _metrics;
private readonly ProjectionName _projectionName;

protected BaseRunner(
ILogger<BaseRunner<T>> logger,
IOptions<ElasticSearchConfiguration> configuration,
IEventStore store,
IProjectionStates projectionStates,
string elasticSearchProjectionsProjectionName,
string projectionFullName,
string projectionName,
Type[] eventHandlers,
Elastic elastic,
ElasticBus bus,
IContextFactory contextFactory)
IContextFactory contextFactory,
ProjectionName projectionName)
{
_logger = logger;
_store = store;
_projectionStates = projectionStates;
_bus = bus;
_contextFactory = contextFactory;
ContextFactory = contextFactory;
_projectionName = projectionName;

_batchSize = configuration.Value.BatchSize;
_elasticSearchProjectionsProjectionName = elasticSearchProjectionsProjectionName;
_projectionFullName = projectionFullName;
_elastic = elastic;

ProjectionName = projectionName;
EventHandlers = eventHandlers;

_metrics = new OpenTelemetryMetrics.ElasticSearchProjections(projectionName);
_metrics = new OpenTelemetryMetrics.ElasticSearchProjections(_projectionName.Name);
}

public async Task Run()
{
var maxEventNumberToProcess = _store.GetLastEvent();
_metrics.MaxEventNumberToProcessGauge = _metrics.MaxEventNumberToProcessCounter = maxEventNumberToProcess;

var lastProcessedEventNumber = await _projectionStates.GetLastProcessedEventNumber(_elasticSearchProjectionsProjectionName);
var lastProcessedEventNumber = await _projectionStates.GetLastProcessedEventNumber(_projectionName.ElasticSearchProjectionsProjectionName);
var envelopesBehind = maxEventNumberToProcess - lastProcessedEventNumber;
_metrics.NumberOfEnvelopesBehindGauge = _metrics.NumberOfEnvelopesBehindCounter = envelopesBehind;
_metrics.NumberOfEnvelopesBehindHistogram.Record(envelopesBehind);
Expand Down Expand Up @@ -128,21 +128,22 @@ public async Task Run()
_metrics.NumberOfEnvelopesHandledGauge = envelopes.Count;
_metrics.NumberOfEnvelopesHandledCounter = envelopes.Count;
}
catch (ElasticsearchOrganisationNotFoundException organisationNotFoundException)
catch (ElasticsearchAggregateNotFoundException organisationNotFoundException)
{
await using var organisationRegistryContext = _contextFactory.Create();
await using var organisationRegistryContext = ContextFactory.Create();

organisationRegistryContext.OrganisationsToRebuild.Add(
new OrganisationToRebuild
{
OrganisationId = Guid.Parse(organisationNotFoundException.OrganisationId)
OrganisationId = Guid.Parse(organisationNotFoundException.AggregateId)
});
await organisationRegistryContext.SaveChangesAsync();
_logger.LogWarning(
0,
organisationNotFoundException,
"[{ProjectionName}] Could not find {OrganisationId} in ES while processing envelope #{EnvelopeNumber}, adding it to organisations to rebuild",
ProjectionName,
organisationNotFoundException.OrganisationId,
organisationNotFoundException.AggregateId,
newLastProcessedEventNumber);
throw;
}
Expand All @@ -168,26 +169,7 @@ private async Task ProcessChange(IElasticChange? changeSetChange, Dictionary<Gui
}
case ElasticPerDocumentChange<T> perDocumentChange:
{
foreach (var documentChange in perDocumentChange.Changes)
{
T? document;

if (!documentCache.ContainsKey(documentChange.Key))
{
document = (await _elastic.TryGetAsync(async () =>
(await _elastic.WriteClient.GetAsync<T>(documentChange.Key))
.ThrowOnFailure()))
.Source;

documentCache.Add(documentChange.Key, document);
}
else
{
document = documentCache[documentChange.Key];
}

await documentChange.Value(document);
}
await HandlePerDocumentChange(documentCache, perDocumentChange);

break;
}
Expand All @@ -206,6 +188,49 @@ await _elastic.TryGetAsync(async () =>
}
}

private async Task HandlePerDocumentChange(Dictionary<Guid, T> documentCache, ElasticPerDocumentChange<T> perDocumentChange)
{
try
{
foreach (var documentChange in perDocumentChange.Changes)
{
T? document;

if (!documentCache.ContainsKey(documentChange.Key))
{
document = (await _elastic.TryGetAsync(async () =>
(await _elastic.WriteClient.GetAsync<T>(documentChange.Key))
.ThrowOnFailure()))
.Source;

documentCache.Add(documentChange.Key, document);
}
else
{
document = documentCache[documentChange.Key];
}

await documentChange.Value(document);
}
}
catch (ElasticsearchPerDocumentChangeException e)
{
await HandlePerDocumentChangeException(e);
_logger.LogWarning(
0,
e,
"[{ProjectionName}] Error occured for {AggregateId} in ES while processing envelope #{EnvelopeNumber}, adding it to entities to rebuild",
ProjectionName,
e.AggregateId,
e.EnvelopeNumber);

throw;
}
}

protected virtual Task HandlePerDocumentChangeException(ElasticsearchPerDocumentChangeException e)
=> Task.CompletedTask;

private async Task FlushDocuments(Dictionary<Guid, T> documentCache)
{
if (documentCache.Any())
Expand Down Expand Up @@ -241,7 +266,7 @@ private async Task InitialiseProjection(int lastProcessedEventNumber)
return;

_logger.LogInformation("[{ProjectionName}] First run, initialising projections!", ProjectionName);
await ProcessEnvelope(new InitialiseProjection(_projectionFullName).ToTypedEnvelope());
await ProcessEnvelope(new InitialiseProjection(_projectionName.FullName).ToTypedEnvelope());
}

private async Task UpdateProjectionState(int? newLastProcessedEventNumber)
Expand All @@ -250,7 +275,7 @@ private async Task UpdateProjectionState(int? newLastProcessedEventNumber)
return;

_logger.LogInformation("[{ProjectionName}] Processed up until envelope #{LastProcessedEnvelopeNumber}", ProjectionName, newLastProcessedEventNumber);
await _projectionStates.UpdateProjectionState(_elasticSearchProjectionsProjectionName, newLastProcessedEventNumber.Value);
await _projectionStates.UpdateProjectionState(_projectionName.ElasticSearchProjectionsProjectionName, newLastProcessedEventNumber.Value);

_metrics.LastProcessedEventNumberGauge = _metrics.LastProcessedEventNumberCounter = newLastProcessedEventNumber.Value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
namespace OrganisationRegistry.ElasticSearch.Projections.Body;

using System;
using System.Threading.Tasks;
using Client;
using Configuration;
using ElasticSearch.Bodies;
using Bodies;
using Infrastructure;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using OrganisationRegistry.Infrastructure.Events;
using SqlServer;
using SqlServer.ElasticSearchProjections;
using SqlServer.ProjectionState;

public class BodyRunner : BaseRunner<BodyDocument>
Expand Down Expand Up @@ -36,14 +38,24 @@ public BodyRunner(
configuration,
store,
projectionStates,
ElasticSearchProjectionsProjectionName,
ProjectionFullName,
ProjectionName,
EventHandlers,
elastic,
bus,
contextFactory)
contextFactory,
new ProjectionName(ElasticSearchProjectionsProjectionName, ProjectionFullName, ProjectionName))
{
busRegistrar.RegisterEventHandlers(EventHandlers);
}

protected override async Task HandlePerDocumentChangeException(ElasticsearchPerDocumentChangeException e)
{
await using var organisationRegistryContext = ContextFactory.Create();

organisationRegistryContext.BodiesToRebuild.Add(
new BodyToRebuild
{
BodyId = e.AggregateId,
});
await organisationRegistryContext.SaveChangesAsync();
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
namespace OrganisationRegistry.ElasticSearch.Projections.Bodies;
namespace OrganisationRegistry.ElasticSearch.Projections.Body;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Body;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Bodies;
using Client;
using ElasticSearch.Bodies;
using Infrastructure;
using Infrastructure.Change;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using Osc;
using OrganisationRegistry.Infrastructure.Events;
using SqlServer;
using SqlServer.ProjectionState;
using Osc;

public class IndividualBodyRebuildRunner
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace OrganisationRegistry.ElasticSearch.Projections.Body;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Client;
using ElasticSearch.Bodies;
using Bodies;
using Osc;

public static class MassUpdateBodyExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ namespace OrganisationRegistry.ElasticSearch.Projections;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Bodies;
using Body;
using Cache;
using Microsoft.EntityFrameworkCore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,29 @@ namespace OrganisationRegistry.ElasticSearch.Projections.Infrastructure.Change;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.Utilities;
using Client;
using Humanizer;

public class ElasticPerDocumentChange<T> : IElasticChange where T: IDocument
{
public ElasticPerDocumentChange(Guid id, Action<T> change)
{
Changes = new Dictionary<Guid, Func<T, Task>> {{id, doc =>
{
change(doc);
return Task.CompletedTask;
try{
change(doc);
return Task.CompletedTask;
}catch (InvalidOperationException)
{
return Task.FromException(new ElasticsearchPerDocumentChangeException(doc.Id, doc.ChangeId));
}
}
}};
}

public ElasticPerDocumentChange(Dictionary<Guid, Action<T>> changes)
{
Changes = new Dictionary<Guid, Func<T, Task>>();
foreach (var change in changes)
{
Func<T, Task> changeAction = doc =>
{
change.Value(doc);
return Task.CompletedTask;
};

Changes.Add(change.Key, changeAction);
}
}

public ElasticPerDocumentChange(Guid id, Func<T, Task> change)
{
Expand All @@ -43,3 +38,4 @@ public ElasticPerDocumentChange(Dictionary<Guid, Func<T, Task>> changes)

public Dictionary<Guid, Func<T, Task>> Changes { get; init; }
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace OrganisationRegistry.ElasticSearch.Projections.Organisations;

using System;
using System.Threading.Tasks;
using Client;
using Configuration;
using ElasticSearch.Organisations;
Expand All @@ -9,6 +10,7 @@ namespace OrganisationRegistry.ElasticSearch.Projections.Organisations;
using Microsoft.Extensions.Options;
using OrganisationRegistry.Infrastructure.Events;
using SqlServer;
using SqlServer.ElasticSearchProjections;
using SqlServer.ProjectionState;

public class OrganisationsRunner : BaseRunner<OrganisationDocument>
Expand Down Expand Up @@ -51,14 +53,24 @@ public OrganisationsRunner(
configuration,
store,
projectionStates,
ElasticSearchProjectionsProjectionName,
ProjectionFullName,
ProjectionName,
EventHandlers,
elastic,
bus,
contextFactory)
contextFactory,
new ProjectionName(ElasticSearchProjectionsProjectionName, ProjectionFullName, ProjectionName))
{
busRegistrar.RegisterEventHandlers(EventHandlers);
}

protected override async Task HandlePerDocumentChangeException(ElasticsearchPerDocumentChangeException e)
{
await using var organisationRegistryContext = ContextFactory.Create();

organisationRegistryContext.OrganisationsToRebuild.Add(
new OrganisationToRebuild
{
OrganisationId = e.AggregateId,
});
await organisationRegistryContext.SaveChangesAsync();
}
}
Loading