Skip to content

Commit d365aee

Browse files
committed
fix(sub-resource-indexing): Fix item processing on update ownership case (#844)
* fix(sub-resource-indexing): Fix item processing on update ownership case - Process both events with same id in interceptor if it's CREATE and DELETE which have different tenants - Add tenantId filter to call-number relations deletion - Add instance hard deletion in a background job Closes: MSEARCH-1085 (cherry picked from commit 343c161)
1 parent 3d25191 commit d365aee

File tree

15 files changed

+152
-55
lines changed

15 files changed

+152
-55
lines changed

src/main/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptor.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.Comparator;
1414
import java.util.List;
1515
import java.util.Map;
16+
import java.util.Objects;
1617
import java.util.stream.Collectors;
1718
import java.util.stream.StreamSupport;
1819
import lombok.extern.log4j.Log4j2;
@@ -64,12 +65,36 @@ public ConsumerRecords<String, ResourceEvent> intercept(ConsumerRecords<String,
6465
if (list.size() > 1) {
6566
list.sort(Comparator.comparingLong(ConsumerRecord::timestamp));
6667
}
67-
consumerRecords.add(list.getLast().value());
68+
if (isUpdateOwnershipEvents(list)) {
69+
consumerRecords.addAll(list.stream().map(ConsumerRecord::value).toList());
70+
} else {
71+
consumerRecords.add(list.getLast().value());
72+
}
6873
}
6974
populate(consumerRecords);
7075
return records;
7176
}
7277

78+
/**
79+
* Needed in case 2 item events with same id come in 1 batch on update ownership case.
80+
* When mod-inventory-storage send CREATE event for new tenant and DELETE event for old tenant.
81+
* DELETE event in such case could have higher timestamp value and
82+
* caller method (intercept) logic would filter out the CREATE event since both events have same id.
83+
* This method helps identify such case.
84+
*/
85+
private boolean isUpdateOwnershipEvents(List<ConsumerRecord<String, ResourceEvent>> records) {
86+
if (records.size() != 2
87+
|| Objects.equals(records.getFirst().value().getTenant(), records.getLast().value().getTenant())) {
88+
return false;
89+
}
90+
var eventTypes = records.stream()
91+
.map(consumerRecord -> consumerRecord.value().getType())
92+
.toList();
93+
94+
return eventTypes.contains(ResourceEventType.CREATE)
95+
&& eventTypes.contains(ResourceEventType.DELETE);
96+
}
97+
7398
private void populate(List<ResourceEvent> records) {
7499
var batchByTenant = records.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
75100
batchByTenant.forEach((tenant, batch) -> {

src/main/java/org/folio/search/service/InstanceChildrenResourceService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public void persistChildren(String tenantId, ResourceType resourceType, List<Res
5454
var eventsForResourceSharing = events.stream()
5555
.filter(SearchConverterUtils::isUpdateEventForResourceSharing)
5656
.toList();
57-
extractors.forEach(resourceExtractor -> resourceExtractor.persistChildren(shared, noShadowCopiesInstanceEvents));
57+
extractors.forEach(resourceExtractor ->
58+
resourceExtractor.persistChildren(tenantId, shared, noShadowCopiesInstanceEvents));
5859
extractors.forEach(resourceExtractor ->
5960
resourceExtractor.persistChildrenForResourceSharing(shared, eventsForResourceSharing));
6061
}

src/main/java/org/folio/search/service/ScheduledInstanceSubResourcesService.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package org.folio.search.service;
22

3-
import static java.util.Collections.emptyList;
43
import static java.util.function.UnaryOperator.identity;
54
import static java.util.stream.Collectors.toMap;
65
import static org.apache.commons.collections4.MapUtils.getString;
@@ -10,7 +9,6 @@
109
import java.util.LinkedHashMap;
1110
import java.util.List;
1211
import java.util.Map;
13-
import java.util.Optional;
1412
import java.util.stream.Collectors;
1513
import lombok.extern.log4j.Log4j2;
1614
import org.folio.search.configuration.properties.SearchConfigurationProperties;
@@ -156,7 +154,7 @@ private void processInstanceOrItemEntities(ReindexEntityType entityType, String
156154
log.debug("processInstanceOrItemEntities::Processing {} {} entities for tenant {}",
157155
result.records().size(), entityType, tenant);
158156

159-
var eventsByDeleted = result.records().stream()
157+
var events = result.records().stream()
160158
.map(recordMap -> {
161159
var isDeleted = Boolean.TRUE.equals(recordMap.get("isDeleted"));
162160
var eventType = isDeleted ? ResourceEventType.DELETE : ResourceEventType.UPDATE;
@@ -173,39 +171,35 @@ private void processInstanceOrItemEntities(ReindexEntityType entityType, String
173171

174172
return resourceEvent;
175173
})
176-
.collect(Collectors.groupingBy(event -> ResourceEventType.DELETE == event.getType()));
177-
178-
var deleteEvents = Optional.ofNullable(eventsByDeleted.get(true)).orElse(emptyList());
179-
//process delete events first in case 2 events for the same entity, f.e. change of ownership
180-
deleteEvents.stream()
181-
.collect(Collectors.groupingBy(ResourceEvent::getTenant)).forEach((eventsTenant, tenantEvents) ->
182-
instanceChildrenResourceService.persistChildren(eventsTenant, resourceType, tenantEvents));
183-
//process create/update events
184-
Optional.ofNullable(eventsByDeleted.get(false)).orElse(emptyList()).stream()
174+
.toList();
175+
events.stream()
185176
.collect(Collectors.groupingBy(ResourceEvent::getTenant)).forEach((eventsTenant, tenantEvents) ->
186177
instanceChildrenResourceService.persistChildren(eventsTenant, resourceType, tenantEvents));
187178

188-
// Hard delete entities marked as deleted
189-
if (!deleteEvents.isEmpty()) {
179+
var deletedEntities = events.stream()
180+
.filter(event -> ResourceEventType.DELETE == event.getType())
181+
.toList();
182+
183+
if (!deletedEntities.isEmpty()) {
190184
var repository = (MergeRangeRepository) repositories.get(entityType);
191185
if (repository != null) {
192186
log.debug("processInstanceOrItemEntities::Hard deleting {} {} entities with IDs: {}",
193-
deleteEvents.size(), entityType, deleteEvents);
187+
deletedEntities.size(), entityType, deletedEntities);
194188

195189
if (entityType == ReindexEntityType.ITEM) {
196190
// Items need tenant-specific deletion
197-
deleteEvents.stream()
191+
deletedEntities.stream()
198192
.collect(Collectors.groupingBy(ResourceEvent::getTenant,
199193
Collectors.mapping(ResourceEvent::getId, Collectors.toList())))
200194
.forEach((groupTenant, ids) ->
201195
repository.deleteEntitiesForTenant(ids, groupTenant, true)
202196
);
203197
} else {
204198
// Instances use regular deletion
205-
var idsForDelete = deleteEvents.stream()
199+
var idsForDelete = deletedEntities.stream()
206200
.map(ResourceEvent::getId)
207201
.toList();
208-
repository.deleteEntities(idsForDelete);
202+
repository.deleteEntities(idsForDelete, true);
209203
}
210204
}
211205
}

src/main/java/org/folio/search/service/converter/preprocessor/extractor/ChildResourceExtractor.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,17 @@ protected abstract List<Map<String, Object>> constructRelations(boolean shared,
3838

3939
protected abstract String childrenFieldName();
4040

41-
public void persistChildren(boolean shared, List<ResourceEvent> events) {
42-
var instanceIdsForDeletion = events.stream()
41+
public void persistChildren(String tenantId, boolean shared, List<ResourceEvent> events) {
42+
var parentIdsForDeletion = events.stream()
4343
.filter(event -> event.getType() != ResourceEventType.CREATE && event.getType() != ResourceEventType.REINDEX)
4444
.map(ResourceEvent::getId)
4545
.toList();
46-
if (!instanceIdsForDeletion.isEmpty()) {
47-
repository.deleteByInstanceIds(instanceIdsForDeletion, null);
46+
if (!parentIdsForDeletion.isEmpty()) {
47+
if (!events.isEmpty() && ResourceType.ITEM.getName().equals(events.getFirst().getResourceName())) {
48+
repository.deleteByInstanceIds(parentIdsForDeletion, tenantId);
49+
} else {
50+
repository.deleteByInstanceIds(parentIdsForDeletion, null);
51+
}
4852
}
4953

5054
var eventsForSaving = events.stream()

src/main/java/org/folio/search/service/reindex/jdbc/CallNumberRepository.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323
import java.util.Objects;
2424
import java.util.Optional;
25+
import java.util.stream.Stream;
2526
import lombok.extern.log4j.Log4j2;
2627
import org.folio.search.configuration.properties.ReindexConfigurationProperties;
2728
import org.folio.search.model.entity.ChildResourceEntityBatch;
@@ -66,7 +67,7 @@ public class CallNumberRepository extends UploadRangeRepository implements Insta
6667
WITH deleted_ids as (
6768
DELETE
6869
FROM %1$s.instance_call_number
69-
WHERE item_id IN (%2$s)
70+
WHERE item_id IN (%2$s) %3$s
7071
RETURNING call_number_id
7172
)
7273
UPDATE %1$s.call_number
@@ -165,8 +166,12 @@ protected CallNumberRepository(JdbcTemplate jdbcTemplate, JsonConverter jsonConv
165166

166167
@Override
167168
public void deleteByInstanceIds(List<String> itemIds, String tenantId) {
168-
var sql = DELETE_QUERY.formatted(JdbcUtils.getSchemaName(context), getParamPlaceholderForUuid(itemIds.size()));
169-
jdbcTemplate.update(sql, itemIds.toArray());
169+
var sql = DELETE_QUERY.formatted(JdbcUtils.getSchemaName(context), getParamPlaceholderForUuid(itemIds.size()),
170+
tenantId == null ? "" : "AND tenant_id = ?");
171+
var params = tenantId == null
172+
? itemIds.toArray()
173+
: Stream.of(itemIds, List.of(tenantId)).flatMap(List::stream).toArray();
174+
jdbcTemplate.update(sql, params);
170175
}
171176

172177
@Override

src/test/java/org/folio/search/integration/message/interceptor/PopulateInstanceBatchInterceptorTest.java

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.folio.search.integration.message.interceptor;
22

3+
import static org.folio.search.utils.TestConstants.MEMBER2_TENANT_ID;
4+
import static org.folio.search.utils.TestConstants.MEMBER_TENANT_ID;
5+
import static org.folio.search.utils.TestConstants.TENANT_ID;
36
import static org.mockito.ArgumentMatchers.any;
47
import static org.mockito.ArgumentMatchers.eq;
58
import static org.mockito.Mockito.doAnswer;
@@ -20,7 +23,9 @@
2023
import org.apache.kafka.common.header.internals.RecordHeaders;
2124
import org.apache.kafka.common.record.TimestampType;
2225
import org.folio.search.domain.dto.ResourceEvent;
26+
import org.folio.search.domain.dto.ResourceEventType;
2327
import org.folio.search.service.consortium.ConsortiumTenantExecutor;
28+
import org.folio.search.service.reindex.jdbc.ItemRepository;
2429
import org.folio.search.service.reindex.jdbc.MergeInstanceRepository;
2530
import org.folio.spring.exception.SystemUserAuthorizationException;
2631
import org.folio.spring.service.SystemUserScopedExecutionService;
@@ -33,24 +38,25 @@
3338
@ExtendWith(MockitoExtension.class)
3439
class PopulateInstanceBatchInterceptorTest {
3540

36-
private static final String TENANT_ID = "tenantId";
37-
3841
@Mock
3942
private ConsortiumTenantExecutor executionService;
4043
@Mock
4144
private SystemUserScopedExecutionService systemUserScopedExecutionService;
4245
@Mock
4346
private MergeInstanceRepository instanceRepository;
4447
@Mock
48+
private ItemRepository itemRepository;
49+
@Mock
4550
private Consumer<String, ResourceEvent> consumer;
4651

4752
private PopulateInstanceBatchInterceptor populateInstanceBatchInterceptor;
4853

4954
@BeforeEach
5055
void setUp() {
5156
when(instanceRepository.entityType()).thenCallRealMethod();
57+
when(itemRepository.entityType()).thenCallRealMethod();
5258
populateInstanceBatchInterceptor = new PopulateInstanceBatchInterceptor(
53-
List.of(instanceRepository),
59+
List.of(instanceRepository, itemRepository),
5460
executionService,
5561
systemUserScopedExecutionService
5662
);
@@ -111,6 +117,29 @@ void shouldProcessOnlyLatestRecordsSuccessfullyInIntercept() {
111117
verify(instanceRepository).saveEntities(TENANT_ID, List.of(expected));
112118
}
113119

120+
@Test
121+
void shouldProcessAllRecordsForSameIdWhenUpdateOwnership() {
122+
// Arrange
123+
mockExecutionServices();
124+
125+
var now = System.currentTimeMillis();
126+
var expected = List.of(Map.<String, Object>of("id", 1),
127+
Map.<String, Object>of("id", 2));
128+
var consumerRecord1 = createConsumerRecord(MEMBER_TENANT_ID,
129+
ResourceEventType.CREATE, "item", "1", expected.get(0), now);
130+
var consumerRecord2 = createConsumerRecord(MEMBER2_TENANT_ID,
131+
ResourceEventType.DELETE, "item", "2", expected.get(1), now + 1);
132+
var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 0),
133+
List.of(consumerRecord2, consumerRecord1)));
134+
135+
// Act
136+
populateInstanceBatchInterceptor.intercept(records, consumer);
137+
138+
// Assert
139+
verify(itemRepository).saveEntities(MEMBER_TENANT_ID, List.of(expected.get(0)));
140+
verify(itemRepository).deleteEntitiesForTenant(List.of("2"), MEMBER2_TENANT_ID);
141+
}
142+
114143
private void mockExecutionServices() {
115144
doAnswer(invocation -> {
116145
var operation = invocation.<Supplier<?>>getArgument(0);
@@ -124,9 +153,17 @@ private void mockExecutionServices() {
124153
}
125154

126155
private ConsumerRecord<String, ResourceEvent> createConsumerRecord(Map<String, Object> resourceNew, long timestamp) {
156+
return createConsumerRecord(TENANT_ID, ResourceEventType.CREATE, "instance", null, resourceNew, timestamp);
157+
}
158+
159+
private ConsumerRecord<String, ResourceEvent> createConsumerRecord(String tenant, ResourceEventType type,
160+
String resource, String id,
161+
Map<String, Object> resourceNew, long timestamp) {
127162
var resourceEvent = new ResourceEvent()
128-
.tenant(TENANT_ID)
129-
.resourceName("instance")
163+
.id(id)
164+
.tenant(tenant)
165+
.type(type)
166+
.resourceName(resource)
130167
._new(resourceNew);
131168
return new ConsumerRecord<>("topic", 0, 0L, timestamp, TimestampType.CREATE_TIME, 0, 0, "key",
132169
resourceEvent, new RecordHeaders(), Optional.empty());

src/test/java/org/folio/search/service/InstanceChildrenResourceServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void persistChildren(boolean shared) {
5959
service.persistChildren(TENANT_ID, ResourceType.INSTANCE, events);
6060

6161
resourceExtractors.forEach(resourceExtractor ->
62-
verify(resourceExtractor).persistChildren(shared, events));
62+
verify(resourceExtractor).persistChildren(TENANT_ID, shared, events));
6363
}
6464

6565
@ParameterizedTest
@@ -74,7 +74,7 @@ void persistChildrenOnReindex(boolean shared) {
7474
service.persistChildrenOnReindex(TENANT_ID, ResourceType.INSTANCE, instances);
7575

7676
resourceExtractors.forEach(resourceExtractor ->
77-
verify(resourceExtractor).persistChildren(shared, expectedEvents));
77+
verify(resourceExtractor).persistChildren(TENANT_ID, shared, expectedEvents));
7878
}
7979

8080
private ResourceEvent getResourceEvent(UUID id1, Map<String, Object> payload) {

src/test/java/org/folio/search/service/converter/preprocessor/extractor/CallNumberResourceExtractorTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,12 @@
55
import static org.folio.search.service.converter.preprocessor.extractor.impl.CallNumberResourceExtractor.PREFIX_FIELD;
66
import static org.folio.search.service.converter.preprocessor.extractor.impl.CallNumberResourceExtractor.SUFFIX_FIELD;
77
import static org.folio.search.service.converter.preprocessor.extractor.impl.CallNumberResourceExtractor.TYPE_ID_FIELD;
8+
import static org.folio.search.utils.TestConstants.TENANT_ID;
89
import static org.folio.search.utils.TestUtils.mapOf;
10+
import static org.mockito.ArgumentMatchers.anyList;
11+
import static org.mockito.ArgumentMatchers.eq;
12+
import static org.mockito.Mockito.times;
13+
import static org.mockito.Mockito.verify;
914
import static org.mockito.Mockito.when;
1015

1116
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -51,15 +56,17 @@ void setUp() {
5156
void persistChildren() {
5257
when(configService.isEnabled(TenantConfiguredFeature.BROWSE_CALL_NUMBERS)).thenReturn(true);
5358
persistChildrenTest(extractor, repository, callNumberBodySupplier());
59+
verify(repository, times(2)).deleteByInstanceIds(anyList(), eq(TENANT_ID));
5460
}
5561

5662
private static Supplier<Map<String, Object>> callNumberBodySupplier() {
57-
return () -> mapOf(EFFECTIVE_CALL_NUMBER_COMPONENTS_FIELD, mapOf(
63+
return () -> Map.of("resource", "item",
64+
"body", mapOf(EFFECTIVE_CALL_NUMBER_COMPONENTS_FIELD, mapOf(
5865
CALL_NUMBER_FIELD, "call-number",
5966
SUFFIX_FIELD, "suffix",
6067
PREFIX_FIELD, "prefix",
6168
TYPE_ID_FIELD, "type-id"
6269
)
63-
);
70+
));
6471
}
6572
}

src/test/java/org/folio/search/service/converter/preprocessor/extractor/ChildResourceExtractorTestBase.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,28 @@
2626
public abstract class ChildResourceExtractorTestBase {
2727

2828
void persistChildrenTest(ChildResourceExtractor extractor, InstanceChildResourceRepository repository,
29-
Supplier<Map<String, Object>> eventBodySupplier) {
29+
Supplier<Map<String, Object>> eventDataSupplier) {
30+
var eventData = eventDataSupplier.get();
31+
var resource = eventData.get("resource").toString();
32+
var eventBodySupplier = (Supplier<Map<String, Object>>) () ->
33+
(Map<String, Object>) eventDataSupplier.get().get("body");
3034
var eventBody = eventBodySupplier.get();
3135
var oldBody = new HashMap<>(eventBodySupplier.get());
3236
var newBody = new HashMap<>(eventBodySupplier.get());
3337
oldBody.put(SOURCE_FIELD, "FOLIO");
3438
newBody.put(SOURCE_FIELD, SOURCE_CONSORTIUM_PREFIX + "FOLIO");
3539
var events = List.of(
36-
resourceEvent(ResourceEventType.CREATE, eventBody),
37-
resourceEvent(ResourceEventType.REINDEX, eventBody),
38-
resourceEvent(ResourceEventType.UPDATE, noMainValuesBody()),
39-
resourceEvent(ResourceEventType.UPDATE, eventBodySupplier.get()),
40-
resourceEvent(ResourceEventType.DELETE, eventBodySupplier.get()));
40+
resourceEvent(ResourceEventType.CREATE, resource, eventBody),
41+
resourceEvent(ResourceEventType.REINDEX, resource, eventBody),
42+
resourceEvent(ResourceEventType.UPDATE, resource, noMainValuesBody()),
43+
resourceEvent(ResourceEventType.UPDATE, resource, eventBodySupplier.get()),
44+
resourceEvent(ResourceEventType.DELETE, resource, eventBodySupplier.get()));
4145

42-
var sharedResourceEvent = resourceEvent(ResourceEventType.UPDATE, oldBody, newBody);
46+
var sharedResourceEvent = resourceEvent(ResourceEventType.UPDATE, resource, oldBody, newBody);
4347
var instanceIdsForDeletion = List.of(events.get(2).getId(), events.get(3).getId(), events.get(4).getId());
4448
var sharedInstanceIds = List.of(sharedResourceEvent.getId());
4549

46-
extractor.persistChildren(false, events);
50+
extractor.persistChildren(TENANT_ID, false, events);
4751
extractor.persistChildrenForResourceSharing(false, List.of(sharedResourceEvent));
4852

4953
verify(repository, times(2)).deleteByInstanceIds(
@@ -73,16 +77,22 @@ private Map<String, Object> noMainValuesBody() {
7377
)));
7478
}
7579

76-
private ResourceEvent resourceEvent(ResourceEventType type, Map<String, Object> body) {
77-
return resourceEvent(type, null, body);
80+
protected ResourceEvent resourceEvent(ResourceEventType type, Map<String, Object> body) {
81+
return resourceEvent(type, null, null, body);
82+
}
83+
84+
protected ResourceEvent resourceEvent(ResourceEventType type, String resource, Map<String, Object> body) {
85+
return resourceEvent(type, resource, null, body);
7886
}
7987

8088
private ResourceEvent resourceEvent(ResourceEventType type,
89+
String resource,
8190
Map<String, Object> oldBody,
8291
Map<String, Object> newBody) {
8392
return new ResourceEvent()
8493
.id(UUID.randomUUID().toString())
8594
.type(type)
95+
.resourceName(resource)
8696
.tenant(TENANT_ID)
8797
._new(newBody)
8898
.old(oldBody);

0 commit comments

Comments
 (0)