Skip to content

Commit 23b1bed

Browse files
committed
Added the implementation for the SAM Aggregator step + Data Bridge client hooked up.
1 parent d2e800e commit 23b1bed

14 files changed

Lines changed: 436 additions & 99 deletions

File tree

src/KeeperData.Application/Orchestration/Sam/Inserts/SamHoldingInsertContext.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ public class SamHoldingInsertContext
99
public required string Cph { get; init; }
1010
public required int BatchId { get; init; }
1111

12-
public SamCphHolding? RawHolding { get; set; }
12+
public List<SamCphHolding> RawHoldings { get; set; } = [];
1313
public List<SamCphHolder> RawHolders { get; set; } = [];
14-
public List<SamParty> RawParties { get; set; } = [];
1514
public List<SamHerd> RawHerds { get; set; } = [];
15+
public List<SamParty> RawParties { get; set; } = [];
1616

17-
public SamHoldingDocument? SilverHolding { get; set; }
17+
public List<SamHoldingDocument> SilverHoldings { get; set; } = [];
1818
public List<SamPartyDocument> SilverParties { get; set; } = [];
1919
public List<PartyRoleRelationshipDocument> SilverPartyRoles { get; set; } = [];
2020

src/KeeperData.Application/Orchestration/Sam/Inserts/Steps/SamHoldingInsertAggregationStep.cs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,43 +6,45 @@
66
namespace KeeperData.Application.Orchestration.Sam.Inserts.Steps;
77

88
[StepOrder(1)]
9-
public class SamHoldingInsertAggregationStep : ImportStepBase<SamHoldingInsertContext>
9+
public class SamHoldingInsertAggregationStep(
10+
IDataBridgeClient dataBridgeClient,
11+
ILogger<SamHoldingInsertAggregationStep> logger) : ImportStepBase<SamHoldingInsertContext>(logger)
1012
{
11-
private readonly IHttpClientFactory _httpClientFactory;
12-
private readonly HttpClient _httpClient;
13-
14-
private const string ClientName = "DataBridgeApi";
15-
16-
public SamHoldingInsertAggregationStep(
17-
IHttpClientFactory httpClientFactory,
18-
ILogger<SamHoldingInsertAggregationStep> logger)
19-
: base(logger)
20-
{
21-
_httpClientFactory = httpClientFactory;
22-
_httpClient = _httpClientFactory.CreateClient(ClientName);
23-
}
13+
private readonly IDataBridgeClient _dataBridgeClient = dataBridgeClient;
2414

2515
protected override async Task ExecuteCoreAsync(SamHoldingInsertContext context, CancellationToken cancellationToken)
2616
{
27-
// Make API calls using _httpClient using Cph and BatchId
28-
var samCphHolding = new SamCphHolding
29-
{
30-
BATCH_ID = 1,
31-
CHANGE_TYPE = "I"
32-
};
17+
var getHoldingsTask = _dataBridgeClient.GetSamHoldingsAsync(context.Cph, cancellationToken);
18+
var getHoldersTask = _dataBridgeClient.GetSamHoldersAsync(context.Cph, cancellationToken);
19+
var getHerdsTask = _dataBridgeClient.GetSamHerdsAsync(context.Cph, cancellationToken);
3320

34-
if (samCphHolding is not { CHANGE_TYPE: DataBridgeConstants.ChangeTypeInsert })
35-
return;
21+
await Task.WhenAll(
22+
getHoldingsTask,
23+
getHoldersTask,
24+
getHerdsTask);
3625

37-
// Construct Raw model
38-
context.RawHolding = samCphHolding;
26+
context.RawHoldings = getHoldingsTask.Result;
3927

40-
context.RawHolders = [];
28+
context.RawHolders = getHoldersTask.Result;
4129

42-
context.RawHerds = [];
30+
context.RawHerds = getHerdsTask.Result;
4331

44-
context.RawParties = [];
32+
context.RawParties = await GetSamPartiesAsync(context, cancellationToken);
4533

4634
await Task.CompletedTask;
4735
}
36+
37+
private async Task<List<SamParty>> GetSamPartiesAsync(SamHoldingInsertContext context, CancellationToken cancellationToken)
38+
{
39+
var uniquePartyIds = (context.RawHerds ?? Enumerable.Empty<SamHerd>())
40+
.SelectMany(h => h.KeeperPartyIdList
41+
.Union(h.OwnerPartyIdList, StringComparer.OrdinalIgnoreCase))
42+
.Distinct(StringComparer.OrdinalIgnoreCase)
43+
.ToList();
44+
45+
if (uniquePartyIds.Count == 0)
46+
return [];
47+
48+
return await _dataBridgeClient.GetSamPartiesAsync(uniquePartyIds, cancellationToken);
49+
}
4850
}

src/KeeperData.Application/Orchestration/Sam/Inserts/Steps/SamHoldingInsertGoldMappingStep.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ public class SamHoldingInsertGoldMappingStep(ILogger<SamHoldingInsertGoldMapping
1010
{
1111
protected override async Task ExecuteCoreAsync(SamHoldingInsertContext context, CancellationToken cancellationToken)
1212
{
13-
if (context is not { RawHolding.CHANGE_TYPE: DataBridgeConstants.ChangeTypeInsert })
14-
return;
13+
//if (context is not { RawHolding.CHANGE_TYPE: DataBridgeConstants.ChangeTypeInsert })
14+
// return;
1515

1616
// TODO - Add implementation
1717

src/KeeperData.Application/Orchestration/Sam/Inserts/Steps/SamHoldingInsertPersistenceStep.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,20 @@ public class SamHoldingInsertPersistenceStep(
2323

2424
protected override async Task ExecuteCoreAsync(SamHoldingInsertContext context, CancellationToken cancellationToken)
2525
{
26-
if (context is not { RawHolding.CHANGE_TYPE: DataBridgeConstants.ChangeTypeInsert })
27-
return;
26+
//if (context is not { RawHolding.CHANGE_TYPE: DataBridgeConstants.ChangeTypeInsert })
27+
// return;
2828

29-
if (context.SilverHolding is not null)
30-
await _silverHoldingRepository.BulkUpsertAsync([context.SilverHolding], cancellationToken);
29+
//if (context.SilverHolding is not null)
30+
// await _silverHoldingRepository.BulkUpsertAsync([context.SilverHolding], cancellationToken);
3131

32-
if (context.SilverParties is not null)
33-
await _silverPartyRepository.BulkUpsertAsync(context.SilverParties, cancellationToken);
32+
//if (context.SilverParties is not null)
33+
// await _silverPartyRepository.BulkUpsertAsync(context.SilverParties, cancellationToken);
3434

35-
if (context.GoldSite is not null)
36-
await _goldSiteRepository.BulkUpsertAsync([context.GoldSite], cancellationToken);
35+
//if (context.GoldSite is not null)
36+
// await _goldSiteRepository.BulkUpsertAsync([context.GoldSite], cancellationToken);
3737

38-
if (context.GoldParties is not null)
39-
await _goldPartyRepository.BulkUpsertAsync(context.GoldParties, cancellationToken);
38+
//if (context.GoldParties is not null)
39+
// await _goldPartyRepository.BulkUpsertAsync(context.GoldParties, cancellationToken);
4040

4141
await Task.CompletedTask;
4242
}

src/KeeperData.Application/Orchestration/Sam/Inserts/Steps/SamHoldingInsertSilverMappingStep.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@ public class SamHoldingInsertSilverMappingStep(ILogger<SamHoldingInsertSilverMap
1111
{
1212
protected override async Task ExecuteCoreAsync(SamHoldingInsertContext context, CancellationToken cancellationToken)
1313
{
14-
if (context is not { RawHolding.CHANGE_TYPE: DataBridgeConstants.ChangeTypeInsert })
15-
return;
14+
//if (context is not { RawHolding.CHANGE_TYPE: DataBridgeConstants.ChangeTypeInsert })
15+
// return;
1616

17-
context.SilverHolding = SamHoldingMapper.ToSilver(context.RawHolding);
17+
//context.SilverHolding = SamHoldingMapper.ToSilver(context.RawHolding);
1818

19-
context.SilverParties = [
20-
.. SamHolderMapper.ToSilver(context.RawHolders),
21-
.. SamPartyMapper.ToSilver(context.RawParties, context.RawHerds)
22-
];
19+
//context.SilverParties = [
20+
// .. SamHolderMapper.ToSilver(context.RawHolders),
21+
// .. SamPartyMapper.ToSilver(context.RawParties, context.RawHerds)
22+
//];
2323

24-
context.SilverPartyRoles = []; // Map From SilverParties
24+
//context.SilverPartyRoles = []; // Map From SilverParties
2525

2626
await Task.CompletedTask;
2727
}

src/KeeperData.Core/ApiClients/DataBridgeApi/DataBridgeQueries.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,48 @@ public static Dictionary<string, string> CtsKeepersByLidFullIdentifier(string id
2525
["$filter"] = $"LID_FULL_IDENTIFIER eq '{id}'"
2626
};
2727
}
28+
29+
public static Dictionary<string, string> SamHoldingsByCph(string id)
30+
{
31+
return new Dictionary<string, string>
32+
{
33+
["$filter"] = $"CPH eq '{id}'"
34+
};
35+
}
36+
37+
public static Dictionary<string, string> SamHoldersByCph(string id)
38+
{
39+
return new Dictionary<string, string>
40+
{
41+
["$filter"] = $"contains(CPHS, '{id}')"
42+
};
43+
}
44+
45+
public static Dictionary<string, string> SamHerdsByCph(string id)
46+
{
47+
return new Dictionary<string, string>
48+
{
49+
["$filter"] = $"CPHH eq '{id}'"
50+
};
51+
}
52+
53+
public static Dictionary<string, string> SamPartyByPartyId(string id)
54+
{
55+
return new Dictionary<string, string>
56+
{
57+
["$filter"] = $"PARTY_ID eq '{id}'"
58+
};
59+
}
60+
61+
public static Dictionary<string, string> SamPartiesByPartyIds(IEnumerable<string> ids)
62+
{
63+
var filter = string.Join(" or ", ids
64+
.Where(id => !string.IsNullOrWhiteSpace(id))
65+
.Select(id => $"PARTY_ID eq '{id}'"));
66+
67+
return new Dictionary<string, string>
68+
{
69+
["$filter"] = filter
70+
};
71+
}
2872
}

src/KeeperData.Core/ApiClients/DataBridgeApi/IDataBridgeClient.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ namespace KeeperData.Core.ApiClients.DataBridgeApi;
44

55
public interface IDataBridgeClient
66
{
7-
Task<SamCphHolding> GetSamHoldingAsync(string id, CancellationToken cancellationToken);
7+
Task<List<SamCphHolding>> GetSamHoldingsAsync(string id, CancellationToken cancellationToken);
88
Task<List<SamCphHolder>> GetSamHoldersAsync(string id, CancellationToken cancellationToken);
9-
Task<List<SamParty>> GetSamPartiesAsync(string id, CancellationToken cancellationToken);
109
Task<List<SamHerd>> GetSamHerdsAsync(string id, CancellationToken cancellationToken);
10+
Task<SamParty> GetSamPartyAsync(string id, CancellationToken cancellationToken);
11+
Task<List<SamParty>> GetSamPartiesAsync(IEnumerable<string> ids, CancellationToken cancellationToken);
12+
1113

1214
Task<List<CtsCphHolding>> GetCtsHoldingsAsync(string id, CancellationToken cancellationToken);
1315
Task<List<CtsAgentOrKeeper>> GetCtsAgentsAsync(string id, CancellationToken cancellationToken);

src/KeeperData.Infrastructure/ApiClients/DataBridgeClient.cs

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,59 @@ public class DataBridgeClient(IHttpClientFactory factory, ILogger<DataBridgeClie
1515

1616
private const string ClientName = "DataBridgeApi";
1717

18-
public Task<SamCphHolding> GetSamHoldingAsync(string id, CancellationToken cancellationToken)
18+
public async Task<List<SamCphHolding>> GetSamHoldingsAsync(string id, CancellationToken cancellationToken)
1919
{
20-
throw new NotImplementedException();
20+
var query = DataBridgeQueries.SamHoldingsByCph(id);
21+
var uri = UriTemplate.Resolve(DataBridgeApiRoutes.GetSamHoldings, new { }, query);
22+
23+
return await GetFromApiAsync<List<SamCphHolding>>(
24+
uri,
25+
$"Sam holdings for ID '{id}'",
26+
cancellationToken);
27+
}
28+
29+
public async Task<List<SamCphHolder>> GetSamHoldersAsync(string id, CancellationToken cancellationToken)
30+
{
31+
var query = DataBridgeQueries.SamHoldersByCph(id);
32+
var uri = UriTemplate.Resolve(DataBridgeApiRoutes.GetSamHolders, new { }, query);
33+
34+
return await GetFromApiAsync<List<SamCphHolder>>(
35+
uri,
36+
$"Sam holders for ID '{id}'",
37+
cancellationToken);
2138
}
2239

23-
public Task<List<SamCphHolder>> GetSamHoldersAsync(string id, CancellationToken cancellationToken)
40+
public async Task<List<SamHerd>> GetSamHerdsAsync(string id, CancellationToken cancellationToken)
2441
{
25-
throw new NotImplementedException();
42+
var query = DataBridgeQueries.SamHerdsByCph(id);
43+
var uri = UriTemplate.Resolve(DataBridgeApiRoutes.GetSamHerds, new { }, query);
44+
45+
return await GetFromApiAsync<List<SamHerd>>(
46+
uri,
47+
$"Sam herds for ID '{id}'",
48+
cancellationToken);
2649
}
2750

28-
public Task<List<SamParty>> GetSamPartiesAsync(string id, CancellationToken cancellationToken)
51+
public async Task<SamParty> GetSamPartyAsync(string id, CancellationToken cancellationToken)
2952
{
30-
throw new NotImplementedException();
53+
var query = DataBridgeQueries.SamPartyByPartyId(id);
54+
var uri = UriTemplate.Resolve(DataBridgeApiRoutes.GetSamParties, new { }, query);
55+
56+
return await GetFromApiAsync<SamParty>(
57+
uri,
58+
$"Sam party for ID '{id}'",
59+
cancellationToken);
3160
}
3261

33-
public Task<List<SamHerd>> GetSamHerdsAsync(string id, CancellationToken cancellationToken)
62+
public async Task<List<SamParty>> GetSamPartiesAsync(IEnumerable<string> ids, CancellationToken cancellationToken)
3463
{
35-
throw new NotImplementedException();
64+
var query = DataBridgeQueries.SamPartiesByPartyIds(ids);
65+
var uri = UriTemplate.Resolve(DataBridgeApiRoutes.GetSamParties, new { }, query);
66+
67+
return await GetFromApiAsync<List<SamParty>>(
68+
uri,
69+
$"Sam parties for IDs '{string.Join(",", ids)}'",
70+
cancellationToken);
3671
}
3772

3873
public async Task<List<CtsCphHolding>> GetCtsHoldingsAsync(string id, CancellationToken cancellationToken)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
using FluentAssertions;
2+
using KeeperData.Application.Orchestration.Sam.Inserts;
3+
using KeeperData.Core.ApiClients.DataBridgeApi;
4+
using KeeperData.Core.Messaging.Consumers;
5+
using KeeperData.Tests.Common.Factories;
6+
using KeeperData.Tests.Common.Utilities;
7+
using Microsoft.Extensions.DependencyInjection;
8+
using Moq;
9+
using Moq.Contrib.HttpClient;
10+
using System.Net;
11+
12+
namespace KeeperData.Api.Tests.Component.Orchestration.Sam.Inserts;
13+
14+
public class SamHoldingInsertOrchestratorTests
15+
{
16+
[Fact]
17+
public async Task GivenAHoldingIdentifier_WhenExecutingSamHoldingInsertOrchestrator_ShouldProcessAllStepsSuccessfully()
18+
{
19+
var (holdingIdentifier, holdings, holders, herds, parties) = new MockSamDataFactory().CreateMockData(
20+
changeType: DataBridgeConstants.ChangeTypeInsert,
21+
holdingCount: 1,
22+
holderCount: 1,
23+
herdCount: 1,
24+
partyCount: 1);
25+
26+
var holdingsUri = RequestUriUtilities.GetQueryUri(
27+
DataBridgeApiRoutes.GetSamHoldings,
28+
new { },
29+
DataBridgeQueries.SamHoldingsByCph(holdingIdentifier));
30+
31+
var holdersUri = RequestUriUtilities.GetQueryUri(
32+
DataBridgeApiRoutes.GetSamHolders,
33+
new { },
34+
DataBridgeQueries.SamHoldersByCph(holdingIdentifier));
35+
36+
var herdsUri = RequestUriUtilities.GetQueryUri(
37+
DataBridgeApiRoutes.GetSamHerds,
38+
new { },
39+
DataBridgeQueries.SamHerdsByCph(holdingIdentifier));
40+
41+
var partiesUri = RequestUriUtilities.GetQueryUri(
42+
DataBridgeApiRoutes.GetSamParties,
43+
new { },
44+
DataBridgeQueries.SamPartiesByPartyIds(parties.Select(x => x.PARTY_ID)));
45+
46+
var factory = new AppWebApplicationFactory();
47+
48+
SetupDataBridgeApiRequest(factory, holdingsUri, HttpStatusCode.OK, HttpContentUtility.CreateResponseContent(holdings));
49+
SetupDataBridgeApiRequest(factory, holdersUri, HttpStatusCode.OK, HttpContentUtility.CreateResponseContent(holders));
50+
SetupDataBridgeApiRequest(factory, herdsUri, HttpStatusCode.OK, HttpContentUtility.CreateResponseContent(herds));
51+
SetupDataBridgeApiRequest(factory, partiesUri, HttpStatusCode.OK, HttpContentUtility.CreateResponseContent(parties));
52+
53+
var result = await ExecuteTestAsync(factory, holdingIdentifier);
54+
55+
VerifyDataBridgeApiEndpointCalled(factory, holdingsUri, Times.Once());
56+
VerifyDataBridgeApiEndpointCalled(factory, holdersUri, Times.Once());
57+
VerifyDataBridgeApiEndpointCalled(factory, herdsUri, Times.Once());
58+
VerifyDataBridgeApiEndpointCalled(factory, partiesUri, Times.Once());
59+
60+
result.RawHoldings.Should().NotBeNull().And.HaveCount(1);
61+
result.RawHoldings[0].CPH.Should().Be(holdingIdentifier);
62+
63+
result.RawHolders.Should().NotBeNull().And.HaveCount(1);
64+
result.RawHolders[0].CphList.Should().NotBeNull().And.Contain(holdingIdentifier);
65+
66+
result.RawHerds.Should().NotBeNull().And.HaveCount(1);
67+
result.RawHerds[0].CPHH.Should().Be(holdingIdentifier);
68+
69+
result.RawParties.Should().NotBeNull().And.HaveCount(1);
70+
result.RawParties[0].PARTY_ID.Should().Be(parties[0].PARTY_ID);
71+
}
72+
73+
private static async Task<SamHoldingInsertContext> ExecuteTestAsync(AppWebApplicationFactory factory, string holdingIdentifier)
74+
{
75+
var mockPoller = new Mock<IQueuePoller>();
76+
factory.OverrideServiceAsSingleton(mockPoller.Object);
77+
78+
var context = new SamHoldingInsertContext
79+
{
80+
Cph = holdingIdentifier,
81+
BatchId = 1
82+
};
83+
84+
using var scope = factory.Services.CreateAsyncScope();
85+
var orchestrator = scope.ServiceProvider.GetRequiredService<SamHoldingInsertOrchestrator>();
86+
await orchestrator.ExecuteAsync(context, CancellationToken.None);
87+
88+
return context;
89+
}
90+
91+
private static void SetupDataBridgeApiRequest(AppWebApplicationFactory factory, string uri, HttpStatusCode httpStatusCode, StringContent httpResponseMessage)
92+
{
93+
factory.DataBridgeApiClientHttpMessageHandlerMock.SetupRequest(HttpMethod.Get, $"{TestConstants.DataBridgeApiBaseUrl}/{uri}")
94+
.ReturnsResponse(httpStatusCode, httpResponseMessage);
95+
}
96+
97+
private static void VerifyDataBridgeApiEndpointCalled(AppWebApplicationFactory factory, string requestUrl, Times times)
98+
{
99+
factory.DataBridgeApiClientHttpMessageHandlerMock.VerifyRequest(HttpMethod.Get, $"{TestConstants.DataBridgeApiBaseUrl}/{requestUrl}", times);
100+
}
101+
}

0 commit comments

Comments
 (0)