Skip to content

Commit 6305f8c

Browse files
koenmetsuemalfroy
authored andcommitted
fix: or-2893 ensure bodies rebuild on failure
1 parent 63ad532 commit 6305f8c

File tree

3 files changed

+108
-45
lines changed

3 files changed

+108
-45
lines changed

src/OrganisationRegistry.ElasticSearch.Projections/BaseRunner.cs

Lines changed: 77 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ namespace OrganisationRegistry.ElasticSearch.Projections;
88
using System.Threading.Tasks;
99
using Client;
1010
using Configuration;
11+
using ElasticSearch.Bodies;
12+
using ElasticSearch.Organisations;
1113
using Infrastructure;
1214
using Infrastructure.Change;
1315
using Microsoft.Extensions.Logging;
@@ -128,21 +130,22 @@ public async Task Run()
128130
_metrics.NumberOfEnvelopesHandledGauge = envelopes.Count;
129131
_metrics.NumberOfEnvelopesHandledCounter = envelopes.Count;
130132
}
131-
catch (ElasticsearchOrganisationNotFoundException organisationNotFoundException)
133+
catch (ElasticsearchAggregateNotFoundException organisationNotFoundException)
132134
{
133135
await using var organisationRegistryContext = _contextFactory.Create();
136+
134137
organisationRegistryContext.OrganisationsToRebuild.Add(
135138
new OrganisationToRebuild
136139
{
137-
OrganisationId = Guid.Parse(organisationNotFoundException.OrganisationId)
140+
OrganisationId = Guid.Parse(organisationNotFoundException.AggregateId)
138141
});
139142
await organisationRegistryContext.SaveChangesAsync();
140143
_logger.LogWarning(
141144
0,
142145
organisationNotFoundException,
143146
"[{ProjectionName}] Could not find {OrganisationId} in ES while processing envelope #{EnvelopeNumber}, adding it to organisations to rebuild",
144147
ProjectionName,
145-
organisationNotFoundException.OrganisationId,
148+
organisationNotFoundException.AggregateId,
146149
newLastProcessedEventNumber);
147150
throw;
148151
}
@@ -168,26 +171,7 @@ private async Task ProcessChange(IElasticChange? changeSetChange, Dictionary<Gui
168171
}
169172
case ElasticPerDocumentChange<T> perDocumentChange:
170173
{
171-
foreach (var documentChange in perDocumentChange.Changes)
172-
{
173-
T? document;
174-
175-
if (!documentCache.ContainsKey(documentChange.Key))
176-
{
177-
document = (await _elastic.TryGetAsync(async () =>
178-
(await _elastic.WriteClient.GetAsync<T>(documentChange.Key))
179-
.ThrowOnFailure()))
180-
.Source;
181-
182-
documentCache.Add(documentChange.Key, document);
183-
}
184-
else
185-
{
186-
document = documentCache[documentChange.Key];
187-
}
188-
189-
await documentChange.Value(document);
190-
}
174+
await HandlePerDocumentChange(documentCache, perDocumentChange);
191175

192176
break;
193177
}
@@ -206,6 +190,76 @@ await _elastic.TryGetAsync(async () =>
206190
}
207191
}
208192

193+
private async Task HandlePerDocumentChange(Dictionary<Guid, T> documentCache, ElasticPerDocumentChange<T> perDocumentChange)
194+
{
195+
try
196+
{
197+
foreach (var documentChange in perDocumentChange.Changes)
198+
{
199+
T? document;
200+
201+
if (!documentCache.ContainsKey(documentChange.Key))
202+
{
203+
document = (await _elastic.TryGetAsync(async () =>
204+
(await _elastic.WriteClient.GetAsync<T>(documentChange.Key))
205+
.ThrowOnFailure()))
206+
.Source;
207+
208+
documentCache.Add(documentChange.Key, document);
209+
}
210+
else
211+
{
212+
document = documentCache[documentChange.Key];
213+
}
214+
215+
await documentChange.Value(document);
216+
}
217+
}
218+
catch (ElasticsearchPerDocumentChangeException e)
219+
{
220+
await using var organisationRegistryContext = _contextFactory.Create();
221+
222+
switch (perDocumentChange)
223+
{
224+
case ElasticPerDocumentChange<OrganisationDocument>:
225+
organisationRegistryContext.OrganisationsToRebuild.Add(
226+
new OrganisationToRebuild
227+
{
228+
OrganisationId = e.AggregateId,
229+
});
230+
await organisationRegistryContext.SaveChangesAsync();
231+
_logger.LogWarning(
232+
0,
233+
e,
234+
"[{ProjectionName}] Error occured for {AggregateId} in ES while processing envelope #{EnvelopeNumber}, adding it to entities to rebuild",
235+
ProjectionName,
236+
e.AggregateId,
237+
e.EnvelopeNumber);
238+
239+
break;
240+
241+
case ElasticPerDocumentChange<BodyDocument>:
242+
organisationRegistryContext.BodiesToRebuild.Add(
243+
new BodyToRebuild()
244+
{
245+
BodyId = e.AggregateId,
246+
});
247+
await organisationRegistryContext.SaveChangesAsync();
248+
_logger.LogWarning(
249+
0,
250+
e,
251+
"[{ProjectionName}] Error occured for {AggregateId} in ES while processing envelope #{EnvelopeNumber}, adding it to entities to rebuild",
252+
ProjectionName,
253+
e.AggregateId,
254+
e.EnvelopeNumber);
255+
256+
break;
257+
}
258+
259+
throw;
260+
}
261+
}
262+
209263
private async Task FlushDocuments(Dictionary<Guid, T> documentCache)
210264
{
211265
if (documentCache.Any())

src/OrganisationRegistry.ElasticSearch.Projections/Infrastructure/Change/ElasticPerDocumentChange.cs

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,29 @@ namespace OrganisationRegistry.ElasticSearch.Projections.Infrastructure.Change;
22

33
using System;
44
using System.Collections.Generic;
5+
using System.Linq;
56
using System.Threading.Tasks;
7+
using Be.Vlaanderen.Basisregisters.Utilities;
8+
using Client;
9+
using Humanizer;
610

711
public class ElasticPerDocumentChange<T> : IElasticChange where T: IDocument
812
{
913
public ElasticPerDocumentChange(Guid id, Action<T> change)
1014
{
1115
Changes = new Dictionary<Guid, Func<T, Task>> {{id, doc =>
1216
{
13-
change(doc);
14-
return Task.CompletedTask;
17+
try{
18+
change(doc);
19+
return Task.CompletedTask;
20+
}catch (InvalidOperationException)
21+
{
22+
return Task.FromException(new ElasticsearchPerDocumentChangeException(doc.Id, doc.ChangeId));
23+
}
1524
}
1625
}};
1726
}
1827

19-
public ElasticPerDocumentChange(Dictionary<Guid, Action<T>> changes)
20-
{
21-
Changes = new Dictionary<Guid, Func<T, Task>>();
22-
foreach (var change in changes)
23-
{
24-
Func<T, Task> changeAction = doc =>
25-
{
26-
change.Value(doc);
27-
return Task.CompletedTask;
28-
};
29-
30-
Changes.Add(change.Key, changeAction);
31-
}
32-
}
3328

3429
public ElasticPerDocumentChange(Guid id, Func<T, Task> change)
3530
{
@@ -43,3 +38,4 @@ public ElasticPerDocumentChange(Dictionary<Guid, Func<T, Task>> changes)
4338

4439
public Dictionary<Guid, Func<T, Task>> Changes { get; init; }
4540
}
41+

src/OrganisationRegistry.ElasticSearch/Client/ThrowOnFailureExtension.cs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public static IGetResponse<T> ThrowOnFailure<T>(this IGetResponse<T> response) w
2222
{
2323
if (response.IsValid) return response;
2424

25-
if (response.ApiCall.HttpStatusCode == 404 && typeof(T) == typeof(OrganisationDocument))
26-
throw new ElasticsearchOrganisationNotFoundException(((GetResponse<OrganisationDocument>)response).Id);
25+
if (response.ApiCall.HttpStatusCode == 404)
26+
throw new ElasticsearchAggregateNotFoundException(((GetResponse<T>)response).Id);
2727

2828
throw new ElasticsearchException(response.DebugInformation, response.OriginalException);
2929
}
@@ -76,12 +76,25 @@ public static void ThrowOnFailure(this BulkResponse response)
7676
}
7777
}
7878

79-
public class ElasticsearchOrganisationNotFoundException : ElasticsearchException
79+
public class ElasticsearchAggregateNotFoundException : ElasticsearchException
8080
{
81-
public string OrganisationId { get; }
81+
public string AggregateId { get; }
8282

83-
public ElasticsearchOrganisationNotFoundException(string organisationId)
83+
public ElasticsearchAggregateNotFoundException(string aggregateId)
8484
{
85-
OrganisationId = organisationId;
85+
AggregateId = aggregateId;
86+
}
87+
}
88+
89+
90+
public class ElasticsearchPerDocumentChangeException : ElasticsearchException
91+
{
92+
public Guid AggregateId { get; }
93+
public int EnvelopeNumber { get; }
94+
95+
public ElasticsearchPerDocumentChangeException(Guid aggregateId, int envelopeNumber)
96+
{
97+
AggregateId = aggregateId;
98+
EnvelopeNumber = envelopeNumber;
8699
}
87100
}

0 commit comments

Comments
 (0)