Skip to content

Commit af2593d

Browse files
[SELC-8138] Fixes to sendDelegationEvents service
1 parent 205ccc7 commit af2593d

File tree

10 files changed

+119
-88
lines changed

10 files changed

+119
-88
lines changed

apps/institution-ms/connector-api/src/main/java/it/pagopa/selfcare/mscore/api/DelegationConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public interface DelegationConnector {
1717

1818
List<Delegation> find(String from, String to, String productId, String search, String taxCode, Order order, Integer page, Integer size);
1919

20-
DelegationWithCursorPagination findFromDate(OffsetDateTime fromDate, Long cursor, Integer size);
20+
DelegationWithPagination findFromDate(OffsetDateTime fromDate, Integer page, Integer size);
2121

2222
DelegationWithPagination findAndCount(GetDelegationParameters delegationParameters);
2323

apps/institution-ms/connector-api/src/main/java/it/pagopa/selfcare/mscore/model/delegation/DelegationWithCursorPagination.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

apps/institution-ms/connector-api/src/main/java/it/pagopa/selfcare/mscore/model/delegation/PageInfo.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
import com.fasterxml.jackson.annotation.JsonInclude;
44
import lombok.AllArgsConstructor;
5+
import lombok.Builder;
56
import lombok.Data;
67
import lombok.NoArgsConstructor;
78

89
@Data
910
@NoArgsConstructor
1011
@AllArgsConstructor
1112
@JsonInclude(JsonInclude.Include.NON_NULL)
13+
@Builder
1214
public class PageInfo {
1315
private long pageSize;
1416
private long pageNo;

apps/institution-ms/connector/dao/src/main/java/it/pagopa/selfcare/mscore/connector/dao/DelegationConnectorImpl.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import it.pagopa.selfcare.mscore.api.DelegationConnector;
44
import it.pagopa.selfcare.mscore.connector.dao.model.DelegationEntity;
55
import it.pagopa.selfcare.mscore.connector.dao.model.DelegationInstitutionEntity;
6+
import it.pagopa.selfcare.mscore.connector.dao.model.DelegationPageEntity;
67
import it.pagopa.selfcare.mscore.connector.dao.model.mapper.DelegationEntityMapper;
78
import it.pagopa.selfcare.mscore.constant.DelegationState;
89
import it.pagopa.selfcare.mscore.constant.DelegationType;
@@ -28,7 +29,6 @@
2829
import java.time.ZoneId;
2930
import java.util.*;
3031
import java.util.regex.Pattern;
31-
import java.util.stream.Collectors;
3232

3333
@Slf4j
3434
@Component
@@ -120,24 +120,26 @@ public List<Delegation> find(String from, String to, String productId, String se
120120
}
121121

122122
@Override
123-
public DelegationWithCursorPagination findFromDate(OffsetDateTime fromDate, Long cursor, Integer size) {
124-
final Query query = Query.query(new Criteria().orOperator(
125-
Criteria.where(DelegationEntity.Fields.updatedAt.name()).gte(fromDate),
126-
Criteria.where(DelegationEntity.Fields.createdAt.name()).gte(fromDate)
127-
)).with(Sort.by(Sort.Direction.ASC, DelegationEntity.Fields.createdAt.name())).limit(Optional.ofNullable(size).orElse(100));
128-
129-
Optional.ofNullable(cursor).ifPresent(c ->
130-
query.addCriteria(Criteria.where(DelegationEntity.Fields.createdAt.name())
131-
.gt(OffsetDateTime.ofInstant(Instant.ofEpochMilli(c), ZoneId.systemDefault())))
132-
);
123+
public DelegationWithPagination findFromDate(OffsetDateTime fromDate, Integer page, Integer size) {
124+
final long effectivePage = page != null && page > 0 ? page : 0;
125+
final long effectiveSize = size != null && size > 0 ? size : 100;
126+
127+
final MatchOperation matchOperation = Aggregation.match(Criteria.where(DelegationEntity.Fields.createdAt.name()).gte(fromDate));
128+
129+
final SortOperation sortOperation = Aggregation.sort(Sort.Direction.ASC, "createdAt");
133130

134-
final List<Delegation> delegations = repository.find(query, DelegationEntity.class)
135-
.stream().map(delegationMapper::convertToDelegation)
136-
.collect(Collectors.toCollection(ArrayList::new));
137-
final Long lastElementCursor = !delegations.isEmpty() ?
138-
delegationMapper.convertToTimestampId(delegations.get(delegations.size() - 1).getCreatedAt()) : null;
131+
final FacetOperation facetOperation = Aggregation.facet(Aggregation.count().as("totalElements")).as("pageInfo")
132+
.and(Aggregation.skip(effectivePage * effectiveSize), Aggregation.limit(effectiveSize)).as("delegations");
139133

140-
return new DelegationWithCursorPagination(delegations, lastElementCursor);
134+
final Aggregation aggregation = Aggregation.newAggregation(matchOperation, sortOperation, facetOperation);
135+
136+
final DelegationPageEntity delegationPageEntity = mongoTemplate.aggregate(aggregation, COLLECTION_DELEGATIONS, DelegationPageEntity.class)
137+
.getMappedResults().stream().findFirst().orElseThrow();
138+
139+
final List<Delegation> delegations = delegationPageEntity.getDelegations().stream().map(delegationMapper::convertToDelegation).toList();
140+
final long totalElements = delegationPageEntity.getPageInfo().stream().findFirst().map(DelegationPageEntity.PageInfo::getTotalElements).orElse(0L);
141+
final PageInfo pageInfo = new PageInfo(effectiveSize, effectivePage, totalElements, (long) Math.ceil((double) totalElements / effectiveSize));
142+
return new DelegationWithPagination(delegations, pageInfo);
141143
}
142144

143145
@Override
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package it.pagopa.selfcare.mscore.connector.dao.model;
2+
3+
import lombok.Data;
4+
5+
import java.util.List;
6+
7+
@Data
8+
public class DelegationPageEntity {
9+
10+
private List<PageInfo> pageInfo;
11+
private List<DelegationEntity> delegations;
12+
13+
@Data
14+
public static class PageInfo {
15+
private Long totalElements;
16+
}
17+
18+
}

apps/institution-ms/connector/dao/src/test/java/it/pagopa/selfcare/mscore/connector/dao/DelegationConnectorImplTest.java

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import it.pagopa.selfcare.mscore.connector.dao.model.DelegationEntity;
44
import it.pagopa.selfcare.mscore.connector.dao.model.DelegationInstitutionEntity;
5+
import it.pagopa.selfcare.mscore.connector.dao.model.DelegationPageEntity;
56
import it.pagopa.selfcare.mscore.connector.dao.model.InstitutionEntity;
67
import it.pagopa.selfcare.mscore.connector.dao.model.mapper.DelegationEntityMapper;
78
import it.pagopa.selfcare.mscore.connector.dao.model.mapper.DelegationEntityMapperImpl;
@@ -25,7 +26,6 @@
2526
import org.springframework.data.mongodb.core.MongoTemplate;
2627
import org.springframework.data.mongodb.core.aggregation.Aggregation;
2728
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
28-
import org.springframework.data.mongodb.core.query.Query;
2929
import org.springframework.data.mongodb.core.query.Update;
3030
import org.springframework.data.support.PageableExecutionUtils;
3131
import org.springframework.test.context.ContextConfiguration;
@@ -38,7 +38,6 @@
3838

3939
import static it.pagopa.selfcare.mscore.constant.GenericError.CREATE_DELEGATION_ERROR;
4040
import static org.junit.jupiter.api.Assertions.*;
41-
import static org.junit.jupiter.api.Assertions.assertEquals;
4241
import static org.mockito.ArgumentMatchers.any;
4342
import static org.mockito.ArgumentMatchers.anyString;
4443
import static org.mockito.Mockito.*;
@@ -469,29 +468,40 @@ void findDelegatorsAndDelegatesTest() {
469468
}
470469

471470
@Test
472-
void findFromDateTestWithoutCursorWithSize() {
473-
when(delegationRepository.find(any(), any())).thenReturn(List.of(new DelegationEntity()));
474-
delegationConnectorImpl.findFromDate(OffsetDateTime.now(), null, 10);
475-
final ArgumentCaptor<Query> argumentCaptor = ArgumentCaptor.forClass(Query.class);
476-
verify(delegationRepository).find(argumentCaptor.capture(), any());
477-
final Query query = argumentCaptor.getValue();
478-
assertEquals(10, query.getLimit());
479-
assertEquals(1, query.getQueryObject().size());
480-
assertTrue(query.getQueryObject().containsKey("$or"));
481-
assertFalse(query.getQueryObject().containsKey("createdAt"));
482-
}
483-
484-
@Test
485-
void findFromDateTestWithCursorWithoutSize() {
486-
when(delegationRepository.find(any(), any())).thenReturn(List.of(new DelegationEntity()));
487-
delegationConnectorImpl.findFromDate(OffsetDateTime.now(), 1L, null);
488-
final ArgumentCaptor<Query> argumentCaptor = ArgumentCaptor.forClass(Query.class);
489-
verify(delegationRepository).find(argumentCaptor.capture(), any());
490-
final Query query = argumentCaptor.getValue();
491-
assertEquals(100, query.getLimit());
492-
assertEquals(2, query.getQueryObject().size());
493-
assertTrue(query.getQueryObject().containsKey("$or"));
494-
assertTrue(query.getQueryObject().containsKey("createdAt"));
471+
void findFromDateTest() {
472+
final DelegationPageEntity delegationPageEntity = new DelegationPageEntity();
473+
delegationPageEntity.setDelegations(List.of(
474+
new DelegationEntity(),
475+
new DelegationEntity(),
476+
new DelegationEntity()
477+
));
478+
final DelegationPageEntity.PageInfo pageInfo = new DelegationPageEntity.PageInfo();
479+
pageInfo.setTotalElements(3L);
480+
delegationPageEntity.setPageInfo(List.of(pageInfo));
481+
482+
when(mongoTemplate.aggregate(any(Aggregation.class), eq("Delegations"), eq(DelegationPageEntity.class)))
483+
.thenReturn(new AggregationResults<>(List.of(delegationPageEntity), new Document()));
484+
485+
final DelegationWithPagination result0 = delegationConnectorImpl.findFromDate(OffsetDateTime.now(), 0, 10);
486+
assertEquals(3, result0.getDelegations().size());
487+
assertEquals(0, result0.getPageInfo().getPageNo());
488+
assertEquals(1, result0.getPageInfo().getTotalPages());
489+
assertEquals(3, result0.getPageInfo().getTotalElements());
490+
assertEquals(10, result0.getPageInfo().getPageSize());
491+
492+
final DelegationWithPagination result1 = delegationConnectorImpl.findFromDate(OffsetDateTime.now(), null, null);
493+
assertEquals(3, result1.getDelegations().size());
494+
assertEquals(0, result1.getPageInfo().getPageNo());
495+
assertEquals(1, result1.getPageInfo().getTotalPages());
496+
assertEquals(3, result1.getPageInfo().getTotalElements());
497+
assertEquals(100, result1.getPageInfo().getPageSize());
498+
499+
final DelegationWithPagination result2 = delegationConnectorImpl.findFromDate(OffsetDateTime.now(), -1, -1);
500+
assertEquals(3, result2.getDelegations().size());
501+
assertEquals(0, result2.getPageInfo().getPageNo());
502+
assertEquals(1, result2.getPageInfo().getTotalPages());
503+
assertEquals(3, result2.getPageInfo().getTotalElements());
504+
assertEquals(100, result2.getPageInfo().getPageSize());
495505
}
496506

497507
}

apps/institution-ms/connector/rest/src/main/java/it/pagopa/selfcare/mscore/connector/rest/EventHubConnectorImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ public EventHubConnectorImpl(EventHubRestClient eventHubRestClient,
3939
public boolean sendEvent(DelegationNotificationToSend notification) {
4040
try {
4141
eventHubRestClient.sendMessage(notification, Map.of("Authorization", getSASToken(resourceUri, keyName, key)));
42-
log.info("Event notification of delegation with id {} sent", notification.getId());
42+
log.info("Event notification of delegation with id {} sent", notification.getDelegationId());
4343
return true;
4444
} catch (Exception ex) {
45-
log.error("Error sending event notification of delegation with id {}", notification.getId(), ex);
45+
log.error("Error sending event notification of delegation with id {}", notification.getDelegationId(), ex);
4646
return false;
4747
}
4848
}

apps/institution-ms/core/src/main/java/it/pagopa/selfcare/mscore/core/EventsServiceImpl.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
import it.pagopa.selfcare.mscore.api.DelegationConnector;
44
import it.pagopa.selfcare.mscore.api.EventHubConnector;
55
import it.pagopa.selfcare.mscore.core.mapper.DelegationNotificationMapper;
6-
import it.pagopa.selfcare.mscore.model.delegation.DelegationWithCursorPagination;
6+
import it.pagopa.selfcare.mscore.model.delegation.DelegationWithPagination;
77
import lombok.RequiredArgsConstructor;
8+
import lombok.extern.slf4j.Slf4j;
89
import org.springframework.stereotype.Service;
910

1011
import java.time.OffsetDateTime;
12+
import java.util.Map;
13+
import java.util.stream.Collectors;
1114

1215
@Service
1316
@RequiredArgsConstructor
17+
@Slf4j
1418
public class EventsServiceImpl implements EventsService {
1519

1620
private static final int DELEGATION_PAGE_SIZE = 100;
@@ -21,11 +25,23 @@ public class EventsServiceImpl implements EventsService {
2125

2226
@Override
2327
public void sendDelegationEvents(OffsetDateTime fromDate) {
24-
DelegationWithCursorPagination delegationsPage = delegationConnector.findFromDate(fromDate, null, DELEGATION_PAGE_SIZE);
25-
while (delegationsPage.getCursor() != null) {
26-
delegationsPage.getDelegations().forEach(d -> eventHubConnector.sendEvent(delegationNotificationMapper.toDelegationNotificationToSend(d)));
27-
delegationsPage = delegationConnector.findFromDate(fromDate, delegationsPage.getCursor(), DELEGATION_PAGE_SIZE);
28-
}
28+
int successCount = 0;
29+
int errorCount = 0;
30+
int page = 0;
31+
log.info("Starting to send delegation events from date: {}", fromDate);
32+
DelegationWithPagination delegationsPage;
33+
do {
34+
delegationsPage = delegationConnector.findFromDate(fromDate, page, DELEGATION_PAGE_SIZE);
35+
final Map<Boolean, Long> results = delegationsPage.getDelegations().stream().collect(Collectors.partitioningBy(
36+
d -> eventHubConnector.sendEvent(delegationNotificationMapper.toDelegationNotificationToSend(d)),
37+
Collectors.counting()
38+
));
39+
successCount += results.get(true);
40+
errorCount += results.get(false);
41+
page++;
42+
log.info("Number of delegation events sent: {} (success: {}, error: {})", successCount + errorCount, successCount, errorCount);
43+
} while (page < delegationsPage.getPageInfo().getTotalPages());
44+
log.info("Finished sending delegation events from date: {}", fromDate);
2945
}
3046

3147
}

apps/institution-ms/core/src/main/java/it/pagopa/selfcare/mscore/core/mapper/DelegationNotificationMapper.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import org.mapstruct.Mapping;
88

99
import java.time.OffsetDateTime;
10+
import java.time.format.DateTimeFormatter;
11+
import java.time.format.DateTimeFormatterBuilder;
12+
import java.time.temporal.ChronoField;
1013
import java.util.UUID;
1114

1215
@Mapper(componentModel = "spring", imports = UUID.class)
@@ -20,7 +23,13 @@ public interface DelegationNotificationMapper {
2023
DelegationNotificationToSend toDelegationNotificationToSend(Delegation delegation);
2124

2225
default String toOffsetDateTimeString(OffsetDateTime dateTime) {
23-
return dateTime != null ? dateTime.toString() : null;
26+
//final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
27+
final DateTimeFormatter dateTimeFormatter = new DateTimeFormatterBuilder()
28+
.appendPattern("yyyy-MM-dd'T'HH:mm:ss")
29+
.appendFraction(ChronoField.NANO_OF_SECOND, 9, 9, true)
30+
.appendPattern("XXX")
31+
.toFormatter();
32+
return dateTime != null ? dateTime.format(dateTimeFormatter) : null;
2433
}
2534

2635
default EventType toEventType(OffsetDateTime updatedAt, OffsetDateTime closedAt) {

apps/institution-ms/core/src/test/java/it/pagopa/selfcare/mscore/core/EventsServiceImplTest.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
import it.pagopa.selfcare.mscore.core.mapper.DelegationNotificationMapperImpl;
1010
import it.pagopa.selfcare.mscore.model.DelegationNotificationToSend;
1111
import it.pagopa.selfcare.mscore.model.delegation.Delegation;
12-
import it.pagopa.selfcare.mscore.model.delegation.DelegationWithCursorPagination;
12+
import it.pagopa.selfcare.mscore.model.delegation.DelegationWithPagination;
13+
import it.pagopa.selfcare.mscore.model.delegation.PageInfo;
1314
import it.pagopa.selfcare.onboarding.common.InstitutionType;
1415
import org.junit.jupiter.api.Test;
1516
import org.junit.jupiter.api.extension.ExtendWith;
@@ -20,7 +21,6 @@
2021
import org.mockito.junit.jupiter.MockitoExtension;
2122

2223
import java.time.OffsetDateTime;
23-
import java.util.ArrayList;
2424
import java.util.List;
2525

2626
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -44,25 +44,20 @@ public class EventsServiceImplTest {
4444

4545
@Test
4646
void sendDelegationEventsTest() {
47-
final DelegationWithCursorPagination delegationsPage0 = new DelegationWithCursorPagination();
47+
final DelegationWithPagination delegationsPage0 = new DelegationWithPagination();
4848
delegationsPage0.setDelegations(List.of(
4949
createDelegation(OffsetDateTime.now(), null, null),
5050
createDelegation(OffsetDateTime.now(), OffsetDateTime.now(), null)
5151
));
52-
delegationsPage0.setCursor(0L);
53-
when(delegationConnector.findFromDate(any(), isNull(), any())).thenReturn(delegationsPage0);
52+
delegationsPage0.setPageInfo(PageInfo.builder().totalPages(2).pageNo(0).totalElements(3).pageSize(2).build());
53+
when(delegationConnector.findFromDate(any(), eq(0), any())).thenReturn(delegationsPage0);
5454

55-
final DelegationWithCursorPagination delegationsPage1 = new DelegationWithCursorPagination();
55+
final DelegationWithPagination delegationsPage1 = new DelegationWithPagination();
5656
delegationsPage1.setDelegations(List.of(
5757
createDelegation(OffsetDateTime.now(), OffsetDateTime.now(), OffsetDateTime.now())
5858
));
59-
delegationsPage1.setCursor(1L);
60-
when(delegationConnector.findFromDate(any(), eq(0L), any())).thenReturn(delegationsPage1);
61-
62-
final DelegationWithCursorPagination delegationsPage2 = new DelegationWithCursorPagination();
63-
delegationsPage2.setDelegations(new ArrayList<>());
64-
delegationsPage2.setCursor(null);
65-
when(delegationConnector.findFromDate(any(), eq(1L), any())).thenReturn(delegationsPage2);
59+
delegationsPage1.setPageInfo(PageInfo.builder().totalPages(2).pageNo(1).totalElements(3).pageSize(2).build());
60+
when(delegationConnector.findFromDate(any(), eq(1), any())).thenReturn(delegationsPage1);
6661

6762
eventsServiceImpl.sendDelegationEvents(OffsetDateTime.now());
6863

@@ -73,10 +68,8 @@ void sendDelegationEventsTest() {
7368
assertEquals(1, sentNotifications.stream().filter(n -> EventType.ADD.equals(n.getEventType())).count());
7469
assertEquals(2, sentNotifications.stream().filter(n -> EventType.UPDATE.equals(n.getEventType())).count());
7570

76-
verify(delegationConnector, times(1)).findFromDate(any(), isNull(), any());
77-
verify(delegationConnector, times(1)).findFromDate(any(), eq(0L), any());
78-
verify(delegationConnector, times(1)).findFromDate(any(), eq(1L), any());
79-
verify(delegationConnector, times(3)).findFromDate(any(), any(), any());
71+
verify(delegationConnector, times(1)).findFromDate(any(), eq(0), any());
72+
verify(delegationConnector, times(1)).findFromDate(any(), eq(1), any());
8073
}
8174

8275
private Delegation createDelegation(OffsetDateTime createdAt, OffsetDateTime updatedAt, OffsetDateTime closedAt) {

0 commit comments

Comments
 (0)