Skip to content

Commit c9bb82d

Browse files
committed
chore: [LOB-1806] Reconciliation indexer endpoint
1 parent 5a5d8c2 commit c9bb82d

File tree

20 files changed

+2648
-24
lines changed

20 files changed

+2648
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package org.cardanofoundation.lob.app.accounting_reporting_core.client.indexer;
2+
3+
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
4+
5+
import java.time.LocalDate;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
9+
import lombok.Getter;
10+
import lombok.RequiredArgsConstructor;
11+
import lombok.extern.slf4j.Slf4j;
12+
13+
import org.springframework.http.MediaType;
14+
import org.springframework.http.ResponseEntity;
15+
import org.springframework.web.client.RestClient;
16+
import org.springframework.web.client.RestClientException;
17+
import org.springframework.web.util.UriComponentsBuilder;
18+
19+
import com.fasterxml.jackson.core.JsonProcessingException;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import io.vavr.control.Either;
22+
import org.zalando.problem.Problem;
23+
import org.zalando.problem.Status;
24+
25+
@Slf4j
26+
@RequiredArgsConstructor
27+
public class OnChainIndexerClient {
28+
29+
private static final String INDEXER_API_ERROR = "INDEXER_API_ERROR";
30+
private static final String TRANSACTIONS_BY_DATE_RANGE_PATH = "/api/v1/transactions/by-date-range";
31+
private static final int DEFAULT_PAGE_SIZE = 100;
32+
33+
private final ObjectMapper objectMapper;
34+
private final RestClient restClient;
35+
36+
@Getter
37+
private final String baseUrl;
38+
private final int pageSize;
39+
40+
public OnChainIndexerClient(ObjectMapper objectMapper, RestClient restClient, String baseUrl) {
41+
this(objectMapper, restClient, baseUrl, DEFAULT_PAGE_SIZE);
42+
}
43+
44+
public Either<Problem, List<OnChainTransactionDto>> retrieveTransactionsByDateRange(
45+
String organisationId,
46+
LocalDate dateFrom,
47+
LocalDate dateTo) {
48+
49+
log.info("Retrieving transactions from On-Chain Indexer for organisation: {}, dateFrom: {}, dateTo: {}",
50+
organisationId, dateFrom, dateTo);
51+
52+
List<OnChainTransactionDto> allTransactions = new ArrayList<>();
53+
int currentPage = 0;
54+
boolean hasMorePages = true;
55+
56+
while (hasMorePages) {
57+
Either<Problem, OnChainTransactionsPageResponse> pageResultE =
58+
fetchPage(organisationId, dateFrom, dateTo, currentPage);
59+
60+
if (pageResultE.isLeft()) {
61+
return Either.left(pageResultE.getLeft());
62+
}
63+
64+
OnChainTransactionsPageResponse pageResponse = pageResultE.get();
65+
allTransactions.addAll(pageResponse.content());
66+
67+
hasMorePages = !pageResponse.last();
68+
currentPage++;
69+
70+
log.debug("Fetched page {} with {} transactions. Total so far: {}. Has more: {}",
71+
currentPage - 1, pageResponse.content().size(), allTransactions.size(), hasMorePages);
72+
}
73+
74+
log.info("Successfully retrieved {} transactions from On-Chain Indexer", allTransactions.size());
75+
return Either.right(allTransactions);
76+
}
77+
78+
private Either<Problem, OnChainTransactionsPageResponse> fetchPage(
79+
String organisationId,
80+
LocalDate dateFrom,
81+
LocalDate dateTo,
82+
int page) {
83+
84+
String url = UriComponentsBuilder.fromHttpUrl(baseUrl)
85+
.path(TRANSACTIONS_BY_DATE_RANGE_PATH)
86+
.queryParam("page", page)
87+
.queryParam("size", pageSize)
88+
.toUriString();
89+
90+
OnChainTransactionSearchRequest requestBody = new OnChainTransactionSearchRequest(
91+
organisationId,
92+
ISO_LOCAL_DATE.format(dateFrom),
93+
ISO_LOCAL_DATE.format(dateTo)
94+
);
95+
96+
try {
97+
String requestJson = objectMapper.writeValueAsString(requestBody);
98+
log.debug("Calling On-Chain Indexer API: {} with body: {}", url, requestJson);
99+
100+
ResponseEntity<String> response = restClient.post()
101+
.uri(url)
102+
.contentType(MediaType.APPLICATION_JSON)
103+
.accept(MediaType.APPLICATION_JSON)
104+
.body(requestJson)
105+
.retrieve()
106+
.toEntity(String.class);
107+
108+
if (response.getStatusCode().is2xxSuccessful()) {
109+
OnChainTransactionsPageResponse pageResponse =
110+
objectMapper.readValue(response.getBody(), OnChainTransactionsPageResponse.class);
111+
return Either.right(pageResponse);
112+
}
113+
114+
log.error("On-Chain Indexer API returned error status: {}, body: {}",
115+
response.getStatusCode(), response.getBody());
116+
117+
return Either.left(Problem.builder()
118+
.withStatus(Status.valueOf(response.getStatusCode().value()))
119+
.withTitle(INDEXER_API_ERROR)
120+
.withDetail("Indexer API returned error: " + response.getBody())
121+
.build());
122+
123+
} catch (JsonProcessingException e) {
124+
log.error("Error parsing JSON response from On-Chain Indexer API: {}", e.getMessage());
125+
return Either.left(Problem.builder()
126+
.withStatus(Status.INTERNAL_SERVER_ERROR)
127+
.withTitle(INDEXER_API_ERROR)
128+
.withDetail("Error parsing JSON response: " + e.getMessage())
129+
.build());
130+
} catch (RestClientException e) {
131+
log.error("Error calling On-Chain Indexer API: {}", e.getMessage());
132+
return Either.left(Problem.builder()
133+
.withStatus(Status.SERVICE_UNAVAILABLE)
134+
.withTitle(INDEXER_API_ERROR)
135+
.withDetail("Error calling indexer API: " + e.getMessage())
136+
.build());
137+
}
138+
}
139+
140+
public Either<Problem, Void> testConnection() {
141+
log.info("Testing connection to On-Chain Indexer at: {}", baseUrl);
142+
143+
try {
144+
String url = UriComponentsBuilder.fromHttpUrl(baseUrl)
145+
.path(TRANSACTIONS_BY_DATE_RANGE_PATH)
146+
.queryParam("page", 0)
147+
.queryParam("size", 1)
148+
.toUriString();
149+
150+
OnChainTransactionSearchRequest requestBody = new OnChainTransactionSearchRequest(
151+
"test",
152+
ISO_LOCAL_DATE.format(LocalDate.now()),
153+
ISO_LOCAL_DATE.format(LocalDate.now())
154+
);
155+
156+
String requestJson = objectMapper.writeValueAsString(requestBody);
157+
158+
ResponseEntity<String> response = restClient.post()
159+
.uri(url)
160+
.contentType(MediaType.APPLICATION_JSON)
161+
.accept(MediaType.APPLICATION_JSON)
162+
.body(requestJson)
163+
.retrieve()
164+
.toEntity(String.class);
165+
166+
if (response.getStatusCode().is2xxSuccessful() || response.getStatusCode().is4xxClientError()) {
167+
log.info("On-Chain Indexer connection test successful");
168+
return Either.right(null);
169+
}
170+
171+
return Either.left(Problem.builder()
172+
.withStatus(Status.valueOf(response.getStatusCode().value()))
173+
.withTitle(INDEXER_API_ERROR)
174+
.withDetail("Connection test failed: " + response.getBody())
175+
.build());
176+
177+
} catch (Exception e) {
178+
log.error("Error testing connection to On-Chain Indexer: {}", e.getMessage());
179+
return Either.left(Problem.builder()
180+
.withStatus(Status.SERVICE_UNAVAILABLE)
181+
.withTitle(INDEXER_API_ERROR)
182+
.withDetail("Connection test failed: " + e.getMessage())
183+
.build());
184+
}
185+
}
186+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package org.cardanofoundation.lob.app.accounting_reporting_core.client.indexer;
2+
3+
import java.util.List;
4+
5+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
6+
import com.fasterxml.jackson.annotation.JsonProperty;
7+
8+
@JsonIgnoreProperties(ignoreUnknown = true)
9+
public record OnChainTransactionDto(
10+
@JsonProperty("id") String id,
11+
@JsonProperty("tx_hash") String txHash,
12+
@JsonProperty("internal_number") String internalNumber,
13+
@JsonProperty("accounting_period") String accountingPeriod,
14+
@JsonProperty("batch_id") String batchId,
15+
@JsonProperty("type") String type,
16+
@JsonProperty("date") String date,
17+
@JsonProperty("organisation_id") String organisationId,
18+
@JsonProperty("items") List<OnChainTransactionItemDto> items
19+
) {
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package org.cardanofoundation.lob.app.accounting_reporting_core.client.indexer;
2+
3+
import java.math.BigDecimal;
4+
5+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
6+
import com.fasterxml.jackson.annotation.JsonProperty;
7+
8+
@JsonIgnoreProperties(ignoreUnknown = true)
9+
public record OnChainTransactionItemDto(
10+
@JsonProperty("id") String id,
11+
@JsonProperty("amountFcy") BigDecimal amountFcy,
12+
@JsonProperty("fxRate") String fxRate,
13+
@JsonProperty("documentNumber") String documentNumber,
14+
@JsonProperty("currency") String currency,
15+
@JsonProperty("costCenterName") String costCenterName,
16+
@JsonProperty("costCenterCustCode") String costCenterCustCode,
17+
@JsonProperty("vatRate") String vatRate,
18+
@JsonProperty("vatCustCode") String vatCustCode,
19+
@JsonProperty("eventCode") String eventCode,
20+
@JsonProperty("eventName") String eventName,
21+
@JsonProperty("projectCustCode") String projectCustCode,
22+
@JsonProperty("projectName") String projectName
23+
) {
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.cardanofoundation.lob.app.accounting_reporting_core.client.indexer;
2+
3+
import com.fasterxml.jackson.annotation.JsonProperty;
4+
5+
public record OnChainTransactionSearchRequest(
6+
@JsonProperty("organisationId") String organisationId,
7+
@JsonProperty("dateFrom") String dateFrom,
8+
@JsonProperty("dateTo") String dateTo
9+
) {
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.cardanofoundation.lob.app.accounting_reporting_core.client.indexer;
2+
3+
import java.util.List;
4+
5+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
6+
import com.fasterxml.jackson.annotation.JsonProperty;
7+
8+
@JsonIgnoreProperties(ignoreUnknown = true)
9+
public record OnChainTransactionsPageResponse(
10+
@JsonProperty("content") List<OnChainTransactionDto> content,
11+
@JsonProperty("total_elements") long totalElements,
12+
@JsonProperty("total_pages") int totalPages,
13+
@JsonProperty("last") boolean last,
14+
@JsonProperty("size") int size,
15+
@JsonProperty("number") int number,
16+
@JsonProperty("first") boolean first,
17+
@JsonProperty("empty") boolean empty
18+
) {
19+
}

accounting_reporting_core/src/main/java/org/cardanofoundation/lob/app/accounting_reporting_core/domain/entity/TransactionItemEntity.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,4 +256,7 @@ public int hashCode() {
256256
return Objects.hashCode(id);
257257
}
258258

259+
public int aggregatedHash() {
260+
return java.util.Objects.hash(accountEvent, fxRate, project, costCenter, document);
261+
}
259262
}

accounting_reporting_core/src/main/java/org/cardanofoundation/lob/app/accounting_reporting_core/service/internal/AccountingCoreEventHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Map;
66
import java.util.Optional;
77
import java.util.Set;
8+
import java.util.stream.Collectors;
89

910
import lombok.RequiredArgsConstructor;
1011
import lombok.extern.slf4j.Slf4j;
@@ -187,6 +188,19 @@ public void handleReconcilationChunkEvent(ReconcilationChunkEvent event) {
187188
chunkDetachedTxEntities
188189
);
189190

191+
// Trigger indexer reconciliation for this chunk
192+
Set<String> chunkTxIds = transactions.stream()
193+
.map(Transaction::getId)
194+
.collect(Collectors.toSet());
195+
196+
transactionReconcilationService.reconcileWithIndexer(
197+
reconcilationId,
198+
organisationId,
199+
fromDate,
200+
toDate,
201+
chunkTxIds
202+
);
203+
190204
log.info("Finished processing handleReconcilationChunkEvent, event: {}", event);
191205

192206
applicationEventPublisher.publishEvent(ReconcilationFinalisationEvent.builder()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package org.cardanofoundation.lob.app.accounting_reporting_core.service.internal;
2+
3+
import java.time.LocalDate;
4+
import java.util.Map;
5+
import java.util.Set;
6+
7+
import io.vavr.control.Either;
8+
import org.zalando.problem.Problem;
9+
10+
import org.cardanofoundation.lob.app.accounting_reporting_core.domain.core.reconcilation.ReconcilationCode;
11+
import org.cardanofoundation.lob.app.accounting_reporting_core.domain.entity.TransactionEntity;
12+
13+
/**
14+
* Interface for indexer reconciliation services.
15+
* Implementations provide the ability to reconcile transactions with an on-chain indexer.
16+
*/
17+
public interface IndexerReconcilationServiceIF {
18+
19+
/**
20+
* Reconciles transactions from the database with transactions from the On-Chain Indexer.
21+
*
22+
* @param organisationId The organisation ID to reconcile
23+
* @param dateFrom Start date for reconciliation
24+
* @param dateTo End date for reconciliation
25+
* @param dbTransactions Transactions from the database to compare
26+
* @return Either a Problem if the API call fails, or a Map of transaction IDs to their reconciliation status
27+
*/
28+
Either<Problem, Map<String, IndexerReconcilationResult>> reconcileWithIndexer(
29+
String organisationId,
30+
LocalDate dateFrom,
31+
LocalDate dateTo,
32+
Set<TransactionEntity> dbTransactions);
33+
34+
/**
35+
* Result of reconciliation for a single transaction
36+
*/
37+
record IndexerReconcilationResult(
38+
ReconcilationCode status,
39+
String mismatchReason
40+
) {
41+
}
42+
}

0 commit comments

Comments
 (0)