Skip to content

Commit 0f5fe59

Browse files
committed
fix data move events
1 parent 850c186 commit 0f5fe59

File tree

3 files changed

+98
-113
lines changed

3 files changed

+98
-113
lines changed

src/main/java/org/folio/search/integration/message/InstanceEventMapper.java

Lines changed: 55 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22

33
import static org.folio.search.configuration.kafka.KafkaConfiguration.SearchTopic.INDEX_INSTANCE;
44
import static org.folio.search.utils.SearchConverterUtils.getEventPayload;
5+
import static org.folio.search.utils.SearchConverterUtils.getNewAsMap;
6+
import static org.folio.search.utils.SearchConverterUtils.getOldAsMap;
57
import static org.folio.search.utils.SearchUtils.ID_FIELD;
68
import static org.folio.search.utils.SearchUtils.INSTANCE_ID_FIELD;
79

10+
import java.util.List;
811
import lombok.RequiredArgsConstructor;
912
import org.apache.commons.collections4.MapUtils;
1013
import org.apache.kafka.clients.consumer.ConsumerRecord;
1114
import org.apache.kafka.clients.producer.ProducerRecord;
15+
import org.apache.kafka.common.header.Headers;
1216
import org.folio.search.domain.dto.ResourceEvent;
13-
import org.folio.search.domain.dto.ResourceEventType;
1417
import org.folio.search.model.event.IndexInstanceEvent;
1518
import org.folio.search.service.consortium.ConsortiumTenantService;
1619
import org.springframework.stereotype.Component;
@@ -30,32 +33,67 @@ public class InstanceEventMapper {
3033
* @param event the consumer record containing resource event
3134
* @return producer record ready to be sent to Kafka
3235
*/
33-
public ProducerRecord<String, IndexInstanceEvent> mapToProducerRecord(ConsumerRecord<String, ResourceEvent> event) {
34-
var instanceId = extractInstanceId(event);
35-
var eventTenant = event.value().getTenant();
36+
public List<ProducerRecord<String, IndexInstanceEvent>> mapToProducerRecords(
37+
ConsumerRecord<String, ResourceEvent> event) {
38+
if (isInstanceResource(event.topic())) {
39+
var producerRecord = toProducerRecord(event.value(), event.headers(), event.topic());
40+
return List.of(producerRecord);
41+
} else {
42+
return extractEventsForDataMove(event.value()).stream()
43+
.map(resourceEvent -> toProducerRecord(resourceEvent, event.headers(), event.topic()))
44+
.toList();
45+
}
46+
}
47+
48+
private ProducerRecord<String, IndexInstanceEvent> toProducerRecord(ResourceEvent resourceEvent, Headers headers,
49+
String topic) {
50+
var instanceId = extractInstanceId(resourceEvent, isInstanceResource(topic));
51+
var eventTenant = resourceEvent.getTenant();
3652
var targetTenant = consortiumTenantService.getCentralTenant(eventTenant).orElse(eventTenant);
3753
var indexInstanceEvent = new IndexInstanceEvent(targetTenant, instanceId);
3854

39-
return new ProducerRecordBuilder<>(
40-
INDEX_INSTANCE.fullTopicName(targetTenant),
41-
instanceId,
42-
indexInstanceEvent,
43-
event.headers())
55+
return new ProducerRecordBuilder<>(getFullTopicName(targetTenant), instanceId, indexInstanceEvent, headers)
4456
.withUpdatedTenantHeaders(targetTenant);
4557
}
4658

47-
private String extractInstanceId(ConsumerRecord<String, ResourceEvent> event) {
48-
var body = event.value();
49-
if (body.getType() == ResourceEventType.REINDEX) {
50-
return event.key();
51-
}
59+
private String getFullTopicName(String targetTenant) {
60+
return INDEX_INSTANCE.fullTopicName(targetTenant);
61+
}
62+
63+
private String extractInstanceId(ResourceEvent body, boolean instanceResource) {
5264
var eventPayload = getEventPayload(body);
53-
return isInstanceResource(event)
65+
return instanceResource
5466
? MapUtils.getString(eventPayload, ID_FIELD)
5567
: MapUtils.getString(eventPayload, INSTANCE_ID_FIELD);
5668
}
5769

58-
private boolean isInstanceResource(ConsumerRecord<String, ResourceEvent> event) {
59-
return event.topic().endsWith("inventory.instance");
70+
private boolean isInstanceResource(String topic) {
71+
return topic.endsWith("inventory.instance");
72+
}
73+
74+
/**
75+
* There may be a case when some data is moved between instances.
76+
* In such case old and new fields of the event will have different instanceId.
77+
* This method will create 2 events out of 1 and erase 'old' field in an original event.
78+
*/
79+
private List<ResourceEvent> extractEventsForDataMove(ResourceEvent resourceEvent) {
80+
if (resourceEvent == null) {
81+
return List.of();
82+
}
83+
84+
var oldMap = getOldAsMap(resourceEvent);
85+
var newMap = getNewAsMap(resourceEvent);
86+
var oldInstanceId = oldMap.get(INSTANCE_ID_FIELD);
87+
88+
if (oldInstanceId != null && !oldInstanceId.equals(newMap.get(INSTANCE_ID_FIELD))) {
89+
var oldEvent = new ResourceEvent().id(String.valueOf(oldInstanceId))
90+
.resourceName(resourceEvent.getResourceName())
91+
.type(resourceEvent.getType())
92+
.tenant(resourceEvent.getTenant())
93+
._new(resourceEvent.getOld());
94+
var newEvent = resourceEvent.old(null);
95+
return List.of(oldEvent, newEvent);
96+
}
97+
return List.of(resourceEvent);
6098
}
6199
}

src/main/java/org/folio/search/integration/message/KafkaMessageListener.java

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
11
package org.folio.search.integration.message;
22

3-
import static org.apache.commons.collections4.MapUtils.getString;
4-
import static org.apache.commons.lang3.RegExUtils.replaceAll;
53
import static org.folio.search.configuration.RetryTemplateConfiguration.KAFKA_RETRY_TEMPLATE_NAME;
64
import static org.folio.search.configuration.SearchCacheNames.REFERENCE_DATA_CACHE;
7-
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
85
import static org.folio.search.domain.dto.ResourceEventType.DELETE;
9-
import static org.folio.search.domain.dto.ResourceEventType.REINDEX;
10-
import static org.folio.search.utils.SearchConverterUtils.getEventPayload;
116
import static org.folio.search.utils.SearchConverterUtils.getResourceEventId;
127
import static org.folio.search.utils.SearchConverterUtils.getResourceSource;
13-
import static org.folio.search.utils.SearchUtils.ID_FIELD;
14-
import static org.folio.search.utils.SearchUtils.INSTANCE_ID_FIELD;
158
import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;
169

1710
import java.util.List;
18-
import java.util.Objects;
1911
import java.util.function.Consumer;
2012
import java.util.function.Predicate;
2113
import java.util.stream.Collectors;
@@ -68,7 +60,8 @@ public void handleInstanceEvents(List<ConsumerRecord<String, ResourceEvent>> con
6860
consumerRecords.stream().collect(Collectors.groupingBy(consumerRecord -> consumerRecord.value().getTenant()))
6961
.forEach((tenant, records) -> executionService.executeSystemUserScoped(tenant, () -> {
7062
records.stream()
71-
.map(instanceEventMapper::mapToProducerRecord)
63+
.map(instanceEventMapper::mapToProducerRecords)
64+
.flatMap(List::stream)
7265
.forEach(instanceEventProducer::send);
7366
return null;
7467
}));
@@ -189,38 +182,6 @@ private void indexResources(List<ResourceEvent> batch, Consumer<List<ResourceEve
189182
}));
190183
}
191184

192-
private static List<ResourceEvent> getInstanceResourceEvents(List<ConsumerRecord<String, ResourceEvent>> events) {
193-
return events.stream()
194-
.map(KafkaMessageListener::getInstanceResourceEvent)
195-
.filter(Objects::nonNull)
196-
.distinct()
197-
.toList();
198-
}
199-
200-
private static ResourceEvent getInstanceResourceEvent(ConsumerRecord<String, ResourceEvent> consumerRecord) {
201-
var instanceId = getInstanceId(consumerRecord);
202-
var value = consumerRecord.value();
203-
if (instanceId == null) {
204-
log.warn("Failed to find instance id in record [record: {}]", replaceAll(value.toString(), "\\s+", " "));
205-
return null;
206-
}
207-
var operation = isInstanceResource(consumerRecord) ? value.getType() : CREATE;
208-
return value.id(instanceId).type(operation);
209-
}
210-
211-
private static String getInstanceId(ConsumerRecord<String, ResourceEvent> event) {
212-
var body = event.value();
213-
if (body.getType() == REINDEX) {
214-
return event.key();
215-
}
216-
var eventPayload = getEventPayload(body);
217-
return isInstanceResource(event) ? getString(eventPayload, ID_FIELD) : getString(eventPayload, INSTANCE_ID_FIELD);
218-
}
219-
220-
private static boolean isInstanceResource(ConsumerRecord<String, ResourceEvent> consumerRecord) {
221-
return consumerRecord.topic().endsWith("inventory.instance");
222-
}
223-
224185
private static void logFailedEvent(ResourceEvent event, Exception e) {
225186
if (event == null) {
226187
log.warn("Failed to index resource event [event: null]", e);

src/test/java/org/folio/search/integration/message/InstanceEventMapperTest.java

Lines changed: 41 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static org.assertj.core.api.Assertions.assertThat;
44
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
55
import static org.folio.search.domain.dto.ResourceEventType.DELETE;
6-
import static org.folio.search.domain.dto.ResourceEventType.REINDEX;
76
import static org.folio.search.domain.dto.ResourceEventType.UPDATE;
87
import static org.folio.support.utils.TestUtils.mapOf;
98
import static org.folio.support.utils.TestUtils.randomId;
@@ -57,14 +56,14 @@ void mapToProducerRecord_shouldMapInstanceCreateEvent() {
5756

5857
var consumerRecord = createConsumerRecord(INSTANCE_ID, resourceEvent, INSTANCE_TOPIC);
5958

60-
var result = mapper.mapToProducerRecord(consumerRecord);
59+
var result = mapper.mapToProducerRecords(consumerRecord);
6160

6261
assertThat(result).isNotNull();
63-
assertThat(result.key()).isEqualTo(INSTANCE_ID);
64-
assertThat(result.value()).isNotNull();
65-
assertThat(result.value().instanceId()).isEqualTo(INSTANCE_ID);
66-
assertThat(result.value().tenant()).isEqualTo(TENANT_ID);
67-
assertThat(result.topic()).isEqualTo(
62+
assertThat(result.getFirst().key()).isEqualTo(INSTANCE_ID);
63+
assertThat(result.getFirst().value()).isNotNull();
64+
assertThat(result.getFirst().value().instanceId()).isEqualTo(INSTANCE_ID);
65+
assertThat(result.getFirst().value().tenant()).isEqualTo(TENANT_ID);
66+
assertThat(result.getFirst().topic()).isEqualTo(
6867
KafkaConfiguration.SearchTopic.INDEX_INSTANCE.fullTopicName(TENANT_ID));
6968
}
7069

@@ -77,10 +76,10 @@ void mapToProducerRecord_shouldMapInstanceUpdateEvent() {
7776

7877
var consumerRecord = createConsumerRecord(INSTANCE_ID, resourceEvent, INSTANCE_TOPIC);
7978

80-
var result = mapper.mapToProducerRecord(consumerRecord);
79+
var result = mapper.mapToProducerRecords(consumerRecord);
8180

82-
assertThat(result.value().instanceId()).isEqualTo(INSTANCE_ID);
83-
assertThat(result.value().tenant()).isEqualTo(TENANT_ID);
81+
assertThat(result.getFirst().value().instanceId()).isEqualTo(INSTANCE_ID);
82+
assertThat(result.getFirst().value().tenant()).isEqualTo(TENANT_ID);
8483
}
8584

8685
@Test
@@ -91,42 +90,29 @@ void mapToProducerRecord_shouldMapInstanceDeleteEvent() {
9190

9291
var consumerRecord = createConsumerRecord(INSTANCE_ID, resourceEvent, INSTANCE_TOPIC);
9392

94-
var result = mapper.mapToProducerRecord(consumerRecord);
93+
var result = mapper.mapToProducerRecords(consumerRecord);
9594

96-
assertThat(result.value().instanceId()).isEqualTo(INSTANCE_ID);
97-
assertThat(result.value().tenant()).isEqualTo(TENANT_ID);
95+
assertThat(result.getFirst().value().instanceId()).isEqualTo(INSTANCE_ID);
96+
assertThat(result.getFirst().value().tenant()).isEqualTo(TENANT_ID);
9897
}
9998

10099
@Test
101-
void mapToProducerRecord_shouldMapReindexEventUsingKey() {
102-
var resourceEvent = resourceEvent(null, ResourceType.INSTANCE, REINDEX);
103-
resourceEvent.tenant(TENANT_ID);
104-
105-
var consumerRecord = createConsumerRecord(INSTANCE_ID, resourceEvent, INSTANCE_TOPIC);
106-
107-
var result = mapper.mapToProducerRecord(consumerRecord);
108-
109-
assertThat(result.value().instanceId()).isEqualTo(INSTANCE_ID);
110-
assertThat(result.value().tenant()).isEqualTo(TENANT_ID);
111-
}
112-
113-
@Test
114-
void mapToProducerRecord_shouldExtractInstanceIdFromItemEvent() {
100+
void mapToProducerRecords_shouldExtractInstanceIdFromItemEvent() {
115101
var itemId = randomId();
116102
var resourceEvent = resourceEvent(null, ResourceType.ITEM, CREATE,
117103
mapOf("id", itemId, "instanceId", INSTANCE_ID), null);
118104
resourceEvent.tenant(TENANT_ID);
119105

120106
var consumerRecord = createConsumerRecord(itemId, resourceEvent, ITEM_TOPIC);
121107

122-
var result = mapper.mapToProducerRecord(consumerRecord);
108+
var result = mapper.mapToProducerRecords(consumerRecord);
123109

124-
assertThat(result.value().instanceId()).isEqualTo(INSTANCE_ID);
125-
assertThat(result.value().tenant()).isEqualTo(TENANT_ID);
110+
assertThat(result.getFirst().value().instanceId()).isEqualTo(INSTANCE_ID);
111+
assertThat(result.getFirst().value().tenant()).isEqualTo(TENANT_ID);
126112
}
127113

128114
@Test
129-
void mapToProducerRecord_shouldExtractInstanceIdFromHoldingEvent() {
115+
void mapToProducerRecords_shouldExtractInstanceIdFromHoldingEvent() {
130116
var holdingId = randomId();
131117
var holdingTopic = "folio.test-tenant.inventory.holding";
132118
var resourceEvent = resourceEvent(null, ResourceType.HOLDINGS, CREATE,
@@ -135,14 +121,14 @@ void mapToProducerRecord_shouldExtractInstanceIdFromHoldingEvent() {
135121

136122
var consumerRecord = createConsumerRecord(holdingId, resourceEvent, holdingTopic);
137123

138-
var result = mapper.mapToProducerRecord(consumerRecord);
124+
var result = mapper.mapToProducerRecords(consumerRecord);
139125

140-
assertThat(result.value().instanceId()).isEqualTo(INSTANCE_ID);
141-
assertThat(result.value().tenant()).isEqualTo(TENANT_ID);
126+
assertThat(result.getFirst().value().instanceId()).isEqualTo(INSTANCE_ID);
127+
assertThat(result.getFirst().value().tenant()).isEqualTo(TENANT_ID);
142128
}
143129

144130
@Test
145-
void mapToProducerRecord_shouldUseCentralTenantWhenAvailable() {
131+
void mapToProducerRecords_shouldUseCentralTenantWhenAvailable() {
146132
when(consortiumTenantService.getCentralTenant(TENANT_ID))
147133
.thenReturn(Optional.of(CENTRAL_TENANT_ID));
148134

@@ -152,14 +138,14 @@ void mapToProducerRecord_shouldUseCentralTenantWhenAvailable() {
152138

153139
var consumerRecord = createConsumerRecord(INSTANCE_ID, resourceEvent, INSTANCE_TOPIC);
154140

155-
var result = mapper.mapToProducerRecord(consumerRecord);
141+
var result = mapper.mapToProducerRecords(consumerRecord);
156142

157-
assertThat(result.value().tenant()).isEqualTo(CENTRAL_TENANT_ID);
158-
assertThat(result.topic()).contains(CENTRAL_TENANT_ID);
143+
assertThat(result.getFirst().value().tenant()).isEqualTo(CENTRAL_TENANT_ID);
144+
assertThat(result.getFirst().topic()).contains(CENTRAL_TENANT_ID);
159145
}
160146

161147
@Test
162-
void mapToProducerRecord_shouldCopyHeadersToProducerRecord() {
148+
void mapToProducerRecord_shouldCopyHeadersToProducerRecords() {
163149
var resourceEvent = resourceEvent(null, ResourceType.INSTANCE, CREATE,
164150
mapOf("id", INSTANCE_ID), null);
165151
resourceEvent.tenant(TENANT_ID);
@@ -173,16 +159,16 @@ void mapToProducerRecord_shouldCopyHeadersToProducerRecord() {
173159
consumerRecord.headers().add("X-Custom-Header", "custom-value".getBytes(StandardCharsets.UTF_8));
174160
consumerRecord.headers().add(XOkapiHeaders.URL, "http://okapi:9130".getBytes(StandardCharsets.UTF_8));
175161

176-
var result = mapper.mapToProducerRecord(consumerRecord);
162+
var result = mapper.mapToProducerRecords(consumerRecord);
177163

178-
assertThat(result.headers()).isNotNull();
164+
assertThat(result.getFirst().headers()).isNotNull();
179165
var headerKeys = new java.util.ArrayList<String>();
180-
result.headers().forEach(header -> headerKeys.add(header.key()));
166+
result.getFirst().headers().forEach(header -> headerKeys.add(header.key()));
181167
assertThat(headerKeys).contains(XOkapiHeaders.URL);
182168
}
183169

184170
@Test
185-
void mapToProducerRecord_shouldUpdateTenantHeadersInProducerRecord() {
171+
void mapToProducerRecord_shouldUpdateTenantHeadersInProducerRecords() {
186172
var resourceEvent = resourceEvent(null, ResourceType.INSTANCE, CREATE,
187173
mapOf("id", INSTANCE_ID), null);
188174
resourceEvent.tenant(TENANT_ID);
@@ -193,10 +179,10 @@ void mapToProducerRecord_shouldUpdateTenantHeadersInProducerRecord() {
193179
consumerRecord.headers().add(XOkapiHeaders.TENANT,
194180
"old-tenant".getBytes(StandardCharsets.UTF_8));
195181

196-
var result = mapper.mapToProducerRecord(consumerRecord);
182+
var result = mapper.mapToProducerRecords(consumerRecord);
197183

198-
var tenantIdHeader = result.headers().lastHeader(FolioKafkaProperties.TENANT_ID);
199-
var okapiTenantHeader = result.headers().lastHeader(XOkapiHeaders.TENANT);
184+
var tenantIdHeader = result.getFirst().headers().lastHeader(FolioKafkaProperties.TENANT_ID);
185+
var okapiTenantHeader = result.getFirst().headers().lastHeader(XOkapiHeaders.TENANT);
200186

201187
assertThat(tenantIdHeader).isNotNull();
202188
assertThat(new String(tenantIdHeader.value(), StandardCharsets.UTF_8)).isEqualTo(TENANT_ID);
@@ -206,23 +192,23 @@ void mapToProducerRecord_shouldUpdateTenantHeadersInProducerRecord() {
206192
}
207193

208194
@Test
209-
void mapToProducerRecord_shouldHandleNullPayload() {
195+
void mapToProducerRecords_shouldHandleNullPayload() {
210196
// For DELETE events or REINDEX, the ID comes from the key, not from payload
211197
var resourceEvent = resourceEvent(null, ResourceType.INSTANCE, DELETE, null, null);
212198
resourceEvent.tenant(TENANT_ID);
213199

214200
var consumerRecord = createConsumerRecord(INSTANCE_ID, resourceEvent, INSTANCE_TOPIC);
215201

216-
var result = mapper.mapToProducerRecord(consumerRecord);
202+
var result = mapper.mapToProducerRecords(consumerRecord);
217203

218204
// When both new and old are null, extractInstanceId returns null for non-REINDEX non-instance topics
219205
// But for instance topics with DELETE, it should use the key
220-
assertThat(result.value().tenant()).isEqualTo(TENANT_ID);
206+
assertThat(result.getFirst().value().tenant()).isEqualTo(TENANT_ID);
221207
// The instance ID extraction logic depends on topic and event type
222208
}
223209

224210
@Test
225-
void mapToProducerRecord_shouldHandleEventWithoutInstanceId() {
211+
void mapToProducerRecords_shouldHandleEventWithoutInstanceId() {
226212
var boundWithId = randomId();
227213
var boundWithTopic = "folio.test-tenant.inventory.bound-with";
228214
var resourceEvent = resourceEvent(null, ResourceType.INSTANCE, CREATE,
@@ -231,23 +217,23 @@ void mapToProducerRecord_shouldHandleEventWithoutInstanceId() {
231217

232218
var consumerRecord = createConsumerRecord(boundWithId, resourceEvent, boundWithTopic);
233219

234-
var result = mapper.mapToProducerRecord(consumerRecord);
220+
var result = mapper.mapToProducerRecords(consumerRecord);
235221

236222
// Should extract instanceId from payload
237-
assertThat(result.value().instanceId()).isEqualTo(INSTANCE_ID);
223+
assertThat(result.getFirst().value().instanceId()).isEqualTo(INSTANCE_ID);
238224
}
239225

240226
@Test
241-
void mapToProducerRecord_shouldGenerateCorrectTopicName() {
227+
void mapToProducerRecords_shouldGenerateCorrectTopicName() {
242228
var resourceEvent = resourceEvent(null, ResourceType.INSTANCE, CREATE,
243229
mapOf("id", INSTANCE_ID), null);
244230
resourceEvent.tenant(TENANT_ID);
245231

246232
var consumerRecord = createConsumerRecord(INSTANCE_ID, resourceEvent, INSTANCE_TOPIC);
247233

248-
var result = mapper.mapToProducerRecord(consumerRecord);
234+
var result = mapper.mapToProducerRecords(consumerRecord);
249235

250-
assertThat(result.topic())
236+
assertThat(result.getFirst().topic())
251237
.isEqualTo(KafkaConfiguration.SearchTopic.INDEX_INSTANCE.fullTopicName(TENANT_ID));
252238
}
253239

0 commit comments

Comments
 (0)