Skip to content

Commit 932e815

Browse files
committed
fix: or-2893 wip, amend please
1 parent 63ad532 commit 932e815

File tree

3 files changed

+37
-23
lines changed

3 files changed

+37
-23
lines changed

src/OrganisationRegistry.ElasticSearch.Projections/BaseRunner.cs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace OrganisationRegistry.ElasticSearch.Projections;
88
using System.Threading.Tasks;
99
using Client;
1010
using Configuration;
11+
using ElasticSearch.Organisations;
1112
using Infrastructure;
1213
using Infrastructure.Change;
1314
using Microsoft.Extensions.Logging;
@@ -128,21 +129,29 @@ public async Task Run()
128129
_metrics.NumberOfEnvelopesHandledGauge = envelopes.Count;
129130
_metrics.NumberOfEnvelopesHandledCounter = envelopes.Count;
130131
}
131-
catch (ElasticsearchOrganisationNotFoundException organisationNotFoundException)
132+
catch (ElasticsearchAggregateNotFoundException organisationNotFoundException)
132133
{
133134
await using var organisationRegistryContext = _contextFactory.Create();
135+
136+
// TODO: switch over document type, place line in right xxxToRebuild table
137+
switch (typeof(T).Name)
138+
{
139+
case nameof(OrganisationDocument):
140+
break;
141+
}
142+
134143
organisationRegistryContext.OrganisationsToRebuild.Add(
135144
new OrganisationToRebuild
136145
{
137-
OrganisationId = Guid.Parse(organisationNotFoundException.OrganisationId)
146+
OrganisationId = Guid.Parse(organisationNotFoundException.AggregateId)
138147
});
139148
await organisationRegistryContext.SaveChangesAsync();
140149
_logger.LogWarning(
141150
0,
142151
organisationNotFoundException,
143152
"[{ProjectionName}] Could not find {OrganisationId} in ES while processing envelope #{EnvelopeNumber}, adding it to organisations to rebuild",
144153
ProjectionName,
145-
organisationNotFoundException.OrganisationId,
154+
organisationNotFoundException.AggregateId,
146155
newLastProcessedEventNumber);
147156
throw;
148157
}

src/OrganisationRegistry.ElasticSearch.Projections/Infrastructure/Change/ElasticPerDocumentChange.cs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,27 @@ namespace OrganisationRegistry.ElasticSearch.Projections.Infrastructure.Change;
22

33
using System;
44
using System.Collections.Generic;
5+
using System.Linq;
56
using System.Threading.Tasks;
7+
using Humanizer;
68

79
public class ElasticPerDocumentChange<T> : IElasticChange where T: IDocument
810
{
911
public ElasticPerDocumentChange(Guid id, Action<T> change)
1012
{
1113
Changes = new Dictionary<Guid, Func<T, Task>> {{id, doc =>
1214
{
15+
try{
1316
change(doc);
1417
return Task.CompletedTask;
18+
}catch (InvalidOperationException)
19+
{
20+
return Task.FromException(new ElasticsearchPerDocumentChangeException());
21+
}
1522
}
1623
}};
1724
}
1825

19-
public ElasticPerDocumentChange(Dictionary<Guid, Action<T>> changes)
20-
{
21-
Changes = new Dictionary<Guid, Func<T, Task>>();
22-
foreach (var change in changes)
23-
{
24-
Func<T, Task> changeAction = doc =>
25-
{
26-
change.Value(doc);
27-
return Task.CompletedTask;
28-
};
29-
30-
Changes.Add(change.Key, changeAction);
31-
}
32-
}
3326

3427
public ElasticPerDocumentChange(Guid id, Func<T, Task> change)
3528
{
@@ -43,3 +36,4 @@ public ElasticPerDocumentChange(Dictionary<Guid, Func<T, Task>> changes)
4336

4437
public Dictionary<Guid, Func<T, Task>> Changes { get; init; }
4538
}
39+

src/OrganisationRegistry.ElasticSearch/Client/ThrowOnFailureExtension.cs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public static IGetResponse<T> ThrowOnFailure<T>(this IGetResponse<T> response) w
2222
{
2323
if (response.IsValid) return response;
2424

25-
if (response.ApiCall.HttpStatusCode == 404 && typeof(T) == typeof(OrganisationDocument))
26-
throw new ElasticsearchOrganisationNotFoundException(((GetResponse<OrganisationDocument>)response).Id);
25+
if (response.ApiCall.HttpStatusCode == 404)
26+
throw new ElasticsearchAggregateNotFoundException(((GetResponse<T>)response).Id);
2727

2828
throw new ElasticsearchException(response.DebugInformation, response.OriginalException);
2929
}
@@ -76,12 +76,23 @@ public static void ThrowOnFailure(this BulkResponse response)
7676
}
7777
}
7878

79-
public class ElasticsearchOrganisationNotFoundException : ElasticsearchException
79+
public class ElasticsearchAggregateNotFoundException : ElasticsearchException
8080
{
81-
public string OrganisationId { get; }
81+
public string AggregateId { get; }
8282

83-
public ElasticsearchOrganisationNotFoundException(string organisationId)
83+
public ElasticsearchAggregateNotFoundException(string aggregateId)
8484
{
85-
OrganisationId = organisationId;
85+
AggregateId = aggregateId;
86+
}
87+
}
88+
89+
90+
public class ElasticsearchPerDocumentChangeException : ElasticsearchException
91+
{
92+
public string AggregateId { get; }
93+
94+
public ElasticsearchPerDocumentChangeException(string aggregateId)
95+
{
96+
AggregateId = aggregateId;
8697
}
8798
}

0 commit comments

Comments
 (0)