Skip to content

Commit f01c035

Browse files
committed
feat(indexing): implemented a two-stage Kafka processing architecture with event aggregation (#890)
Closes: MSEARCH-1157
1 parent 267a49c commit f01c035

21 files changed

+1090
-391
lines changed

NEWS.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## v5.0.12 2026-02-16
2+
### Features
3+
* Implement two-stage Kafka processing with event aggregation for instance indexing ([MSEARCH-1157](https://folio-org.atlassian.net/browse/MSEARCH-1157))
4+
5+
---
6+
17
## v5.0.11 2026-01-26
28
### Bug fixes
39
* Process item/instance in batches, add stale sub-resource lock release logic ([MSEARCH-1097](https://folio-org.atlassian.net/browse/MSEARCH-1097))

src/main/java/org/folio/search/configuration/kafka/InstanceResourceEventKafkaConfiguration.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66
import java.util.Map;
77
import lombok.RequiredArgsConstructor;
88
import org.folio.search.domain.dto.ResourceEvent;
9+
import org.folio.search.model.event.IndexInstanceEvent;
910
import org.springframework.beans.factory.annotation.Value;
1011
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
1112
import org.springframework.context.annotation.Bean;
1213
import org.springframework.context.annotation.Configuration;
1314
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
15+
import org.springframework.kafka.core.KafkaTemplate;
16+
import org.springframework.kafka.core.ProducerFactory;
1417
import org.springframework.kafka.listener.BatchInterceptor;
1518
import org.springframework.kafka.listener.CompositeBatchInterceptor;
1619
import org.springframework.kafka.support.serializer.JsonDeserializer;
@@ -41,4 +44,23 @@ public ConcurrentKafkaListenerContainerFactory<String, ResourceEvent> instanceRe
4144
factory.setBatchInterceptor(new CompositeBatchInterceptor<>(batchInterceptors));
4245
return factory;
4346
}
47+
48+
@Bean
49+
public ConcurrentKafkaListenerContainerFactory<String, IndexInstanceEvent> indexInstanceListenerContainerFactory(
50+
@Value("#{folioKafkaProperties.listener['index-instance'].maxPollRecords}") Integer maxPollRecords,
51+
@Value("#{folioKafkaProperties.listener['index-instance'].maxPollIntervalMs}") Integer maxPollIntervalMs) {
52+
var factory = new ConcurrentKafkaListenerContainerFactory<String, IndexInstanceEvent>();
53+
factory.setBatchListener(true);
54+
var deserializer = new JsonDeserializer<>(IndexInstanceEvent.class, false);
55+
var overrideProperties = Map.<String, Object>of(MAX_POLL_RECORDS_CONFIG, maxPollRecords,
56+
MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs);
57+
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, overrideProperties));
58+
return factory;
59+
}
60+
61+
@Bean
62+
public KafkaTemplate<String, IndexInstanceEvent> indexInstanceKafkaTemplate() {
63+
ProducerFactory<String, IndexInstanceEvent> producerFactory = getProducerFactory(kafkaProperties);
64+
return new KafkaTemplate<>(producerFactory);
65+
}
4466
}

src/main/java/org/folio/search/configuration/kafka/KafkaConfiguration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ protected static <T> DefaultKafkaConsumerFactory<String, T> getConsumerFactory(J
5151

5252
public enum SearchTopic implements FolioKafkaTopic {
5353
REINDEX_RANGE_INDEX("search.reindex.range-index"),
54-
INDEX_SUB_RESOURCE("search.index.sub-resource");
54+
INDEX_SUB_RESOURCE("search.index.sub-resource"),
55+
INDEX_INSTANCE("search.index.instance");
5556

5657
private final String topicName;
5758

@@ -69,5 +70,4 @@ public String envId() {
6970
return FolioEnvironment.getFolioEnvName();
7071
}
7172
}
72-
7373
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package org.folio.search.integration.message;
2+
3+
import static org.folio.search.configuration.kafka.KafkaConfiguration.SearchTopic.INDEX_INSTANCE;
4+
import static org.folio.search.utils.SearchConverterUtils.getEventPayload;
5+
import static org.folio.search.utils.SearchConverterUtils.getNewAsMap;
6+
import static org.folio.search.utils.SearchConverterUtils.getOldAsMap;
7+
import static org.folio.search.utils.SearchUtils.ID_FIELD;
8+
import static org.folio.search.utils.SearchUtils.INSTANCE_ID_FIELD;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Optional;
14+
import lombok.RequiredArgsConstructor;
15+
import org.apache.commons.collections4.MapUtils;
16+
import org.apache.kafka.clients.consumer.ConsumerRecord;
17+
import org.apache.kafka.clients.producer.ProducerRecord;
18+
import org.apache.kafka.common.header.Headers;
19+
import org.folio.search.domain.dto.ResourceEvent;
20+
import org.folio.search.model.event.IndexInstanceEvent;
21+
import org.folio.search.model.types.ResourceType;
22+
import org.folio.search.service.consortium.ConsortiumTenantService;
23+
import org.springframework.stereotype.Component;
24+
25+
/**
26+
* Maps instance resource events to index instance events with producer records.
27+
*/
28+
@Component
29+
@RequiredArgsConstructor
30+
public class InstanceEventMapper {
31+
32+
private final ConsortiumTenantService consortiumTenantService;
33+
34+
/**
35+
* Maps a consumer record to a producer record for indexing.
36+
*
37+
* @param event the consumer record containing resource event
38+
* @return producer record ready to be sent to Kafka
39+
*/
40+
public List<ProducerRecord<String, IndexInstanceEvent>> mapToProducerRecords(
41+
ConsumerRecord<String, ResourceEvent> event) {
42+
var resourceEvent = event.value();
43+
var eventTenant = resourceEvent.getTenant();
44+
var targetTenant = consortiumTenantService.getCentralTenant(eventTenant).orElse(eventTenant);
45+
if (isInstanceResource(resourceEvent)) {
46+
var instanceId = MapUtils.getString(getEventPayload(resourceEvent), ID_FIELD);
47+
return toProducerRecord(instanceId, targetTenant, event.headers()).map(List::of).orElseGet(List::of);
48+
} else {
49+
var oldInstanceId = getInstanceId(getOldAsMap(resourceEvent));
50+
var newInstanceId = getInstanceId(getNewAsMap(resourceEvent));
51+
if (oldInstanceId != null && newInstanceId != null && !oldInstanceId.equals(newInstanceId)) {
52+
List<ProducerRecord<String, IndexInstanceEvent>> records = new ArrayList<>();
53+
var oldProducerRecord = toProducerRecord(oldInstanceId, targetTenant, event.headers());
54+
var newProducerRecord = toProducerRecord(newInstanceId, targetTenant, event.headers());
55+
oldProducerRecord.ifPresent(records::add);
56+
newProducerRecord.ifPresent(records::add);
57+
return records;
58+
}
59+
var instanceId = newInstanceId == null ? oldInstanceId : newInstanceId;
60+
return toProducerRecord(instanceId, targetTenant, event.headers()).map(List::of).orElseGet(List::of);
61+
}
62+
}
63+
64+
private String getInstanceId(Map<String, Object> oldMap) {
65+
return MapUtils.getString(oldMap, INSTANCE_ID_FIELD);
66+
}
67+
68+
private boolean isInstanceResource(ResourceEvent resourceEvent) {
69+
return ResourceType.byName(resourceEvent.getResourceName()).equals(ResourceType.INSTANCE);
70+
}
71+
72+
private Optional<ProducerRecord<String, IndexInstanceEvent>> toProducerRecord(String instanceId,
73+
String targetTenant,
74+
Headers headers) {
75+
if (instanceId == null || targetTenant == null) {
76+
return Optional.empty();
77+
}
78+
var topic = getFullTopicName(targetTenant);
79+
var value = new IndexInstanceEvent(targetTenant, instanceId);
80+
81+
return Optional.of(new ProducerRecordBuilder<>(topic, instanceId, value, headers)
82+
.withUpdatedTenantHeaders(targetTenant));
83+
}
84+
85+
private String getFullTopicName(String targetTenant) {
86+
return INDEX_INSTANCE.fullTopicName(targetTenant);
87+
}
88+
}

src/main/java/org/folio/search/integration/message/KafkaMessageListener.java

Lines changed: 41 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
11
package org.folio.search.integration.message;
22

3-
import static org.apache.commons.collections4.MapUtils.getString;
4-
import static org.apache.commons.lang3.RegExUtils.replaceAll;
53
import static org.folio.search.configuration.RetryTemplateConfiguration.KAFKA_RETRY_TEMPLATE_NAME;
64
import static org.folio.search.configuration.SearchCacheNames.REFERENCE_DATA_CACHE;
7-
import static org.folio.search.domain.dto.ResourceEventType.CREATE;
85
import static org.folio.search.domain.dto.ResourceEventType.DELETE;
9-
import static org.folio.search.domain.dto.ResourceEventType.REINDEX;
10-
import static org.folio.search.utils.SearchConverterUtils.getEventPayload;
116
import static org.folio.search.utils.SearchConverterUtils.getResourceEventId;
127
import static org.folio.search.utils.SearchConverterUtils.getResourceSource;
13-
import static org.folio.search.utils.SearchUtils.ID_FIELD;
14-
import static org.folio.search.utils.SearchUtils.INSTANCE_ID_FIELD;
158
import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;
169

1710
import java.util.List;
18-
import java.util.Objects;
1911
import java.util.function.Consumer;
2012
import java.util.function.Predicate;
2113
import java.util.stream.Collectors;
@@ -25,6 +17,7 @@
2517
import org.apache.kafka.clients.consumer.ConsumerRecord;
2618
import org.apache.logging.log4j.message.FormattedMessage;
2719
import org.folio.search.domain.dto.ResourceEvent;
20+
import org.folio.search.model.event.IndexInstanceEvent;
2821
import org.folio.search.model.types.ResourceType;
2922
import org.folio.search.service.ResourceService;
3023
import org.folio.search.service.config.ConfigSynchronizationService;
@@ -33,6 +26,7 @@
3326
import org.folio.spring.service.SystemUserScopedExecutionService;
3427
import org.springframework.cache.annotation.CacheEvict;
3528
import org.springframework.kafka.annotation.KafkaListener;
29+
import org.springframework.kafka.core.KafkaTemplate;
3630
import org.springframework.stereotype.Component;
3731

3832
/**
@@ -47,6 +41,8 @@ public class KafkaMessageListener {
4741
private final FolioMessageBatchProcessor folioMessageBatchProcessor;
4842
private final SystemUserScopedExecutionService executionService;
4943
private final ConfigSynchronizationService configSynchronizationService;
44+
private final KafkaTemplate<String, IndexInstanceEvent> instanceEventProducer;
45+
private final InstanceEventMapper instanceEventMapper;
5046

5147
/**
5248
* Handles instance events and indexes them by id.
@@ -61,14 +57,36 @@ public class KafkaMessageListener {
6157
concurrency = "#{folioKafkaProperties.listener['events'].concurrency}")
6258
public void handleInstanceEvents(List<ConsumerRecord<String, ResourceEvent>> consumerRecords) {
6359
log.info("Processing instance related events from kafka events [number of events: {}]", consumerRecords.size());
64-
var batch = getInstanceResourceEvents(consumerRecords);
65-
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
60+
consumerRecords.stream().collect(Collectors.groupingBy(consumerRecord -> consumerRecord.value().getTenant()))
61+
.forEach((tenant, records) -> executionService.executeSystemUserScoped(tenant, () -> {
62+
records.stream()
63+
.map(instanceEventMapper::mapToProducerRecords)
64+
.flatMap(List::stream)
65+
.forEach(instanceEventProducer::send);
66+
return null;
67+
}));
68+
}
69+
70+
/**
71+
* Handles instance events and indexes them by id.
72+
*
73+
* @param consumerRecords - list of consumer records from Apache Kafka to process.
74+
*/
75+
@KafkaListener(
76+
id = KafkaConstants.INDEX_INSTANCE_LISTENER_ID,
77+
containerFactory = "indexInstanceListenerContainerFactory",
78+
topicPattern = "#{folioKafkaProperties.listener['index-instance'].topicPattern}",
79+
groupId = "#{folioKafkaProperties.listener['index-instance'].groupId}",
80+
concurrency = "#{folioKafkaProperties.listener['index-instance'].concurrency}")
81+
public void handleIndexInstanceEvents(List<ConsumerRecord<String, IndexInstanceEvent>> consumerRecords) {
82+
log.info("Processing index instance events from kafka [number of events: {}]", consumerRecords.size());
83+
var batchByTenant = consumerRecords.stream().map(ConsumerRecord::value)
84+
.collect(Collectors.groupingBy(IndexInstanceEvent::tenant));
6685
batchByTenant.forEach((tenant, resourceEvents) -> executionService.executeSystemUserScoped(tenant, () -> {
6786
folioMessageBatchProcessor.consumeBatchWithFallback(resourceEvents, KAFKA_RETRY_TEMPLATE_NAME,
68-
resourceService::indexInstancesById, KafkaMessageListener::logFailedEvent);
87+
resourceService::indexInstanceEvents, KafkaMessageListener::logFailedEvent);
6988
return null;
7089
}));
71-
7290
}
7391

7492
/**
@@ -164,38 +182,6 @@ private void indexResources(List<ResourceEvent> batch, Consumer<List<ResourceEve
164182
}));
165183
}
166184

167-
private static List<ResourceEvent> getInstanceResourceEvents(List<ConsumerRecord<String, ResourceEvent>> events) {
168-
return events.stream()
169-
.map(KafkaMessageListener::getInstanceResourceEvent)
170-
.filter(Objects::nonNull)
171-
.distinct()
172-
.toList();
173-
}
174-
175-
private static ResourceEvent getInstanceResourceEvent(ConsumerRecord<String, ResourceEvent> consumerRecord) {
176-
var instanceId = getInstanceId(consumerRecord);
177-
var value = consumerRecord.value();
178-
if (instanceId == null) {
179-
log.warn("Failed to find instance id in record [record: {}]", replaceAll(value.toString(), "\\s+", " "));
180-
return null;
181-
}
182-
var operation = isInstanceResource(consumerRecord) ? value.getType() : CREATE;
183-
return value.id(instanceId).type(operation);
184-
}
185-
186-
private static String getInstanceId(ConsumerRecord<String, ResourceEvent> event) {
187-
var body = event.value();
188-
if (body.getType() == REINDEX) {
189-
return event.key();
190-
}
191-
var eventPayload = getEventPayload(body);
192-
return isInstanceResource(event) ? getString(eventPayload, ID_FIELD) : getString(eventPayload, INSTANCE_ID_FIELD);
193-
}
194-
195-
private static boolean isInstanceResource(ConsumerRecord<String, ResourceEvent> consumerRecord) {
196-
return consumerRecord.topic().endsWith("inventory.instance");
197-
}
198-
199185
private static void logFailedEvent(ResourceEvent event, Exception e) {
200186
if (event == null) {
201187
log.warn("Failed to index resource event [event: null]", e);
@@ -210,4 +196,15 @@ private static void logFailedEvent(ResourceEvent event, Exception e) {
210196
), e);
211197
}
212198

199+
private static void logFailedEvent(IndexInstanceEvent event, Exception e) {
200+
if (event == null) {
201+
log.warn("Failed to index resource event [event: null]", e);
202+
return;
203+
}
204+
205+
log.warn(new FormattedMessage(
206+
"Failed to index instance event [tenantId: {}, id: {}]",
207+
event.tenant(), event.instanceId()
208+
), e);
209+
}
213210
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package org.folio.search.integration.message;
2+
3+
import static org.folio.spring.tools.kafka.FolioKafkaProperties.TENANT_ID;
4+
5+
import java.nio.charset.StandardCharsets;
6+
import org.apache.kafka.clients.producer.ProducerRecord;
7+
import org.apache.kafka.common.header.Headers;
8+
import org.folio.spring.integration.XOkapiHeaders;
9+
10+
/**
11+
* Builder for creating Kafka producer records with proper header management.
12+
*/
13+
public class ProducerRecordBuilder<K, V> {
14+
15+
private final String topic;
16+
private final K key;
17+
private final V value;
18+
private final Headers headers;
19+
20+
public ProducerRecordBuilder(String topic, K key, V value, Headers headers) {
21+
this.topic = topic;
22+
this.key = key;
23+
this.value = value;
24+
this.headers = headers;
25+
}
26+
27+
/**
28+
* Updates tenant-related headers and builds the producer record.
29+
*
30+
* @param targetTenant the target tenant ID
31+
* @return a new ProducerRecord with updated headers
32+
*/
33+
public ProducerRecord<K, V> withUpdatedTenantHeaders(String targetTenant) {
34+
var targetTenantBytes = targetTenant.getBytes(StandardCharsets.UTF_8);
35+
36+
var producerRecord = new ProducerRecord<>(topic, key, value);
37+
copyHeaders(headers, producerRecord.headers(), targetTenantBytes);
38+
39+
return producerRecord;
40+
}
41+
42+
private void copyHeaders(Headers source, Headers destination, byte[] targetTenantBytes) {
43+
source.forEach(header -> {
44+
var headerKey = header.key();
45+
if (isTenantHeader(headerKey)) {
46+
destination.add(headerKey, targetTenantBytes);
47+
} else {
48+
destination.add(headerKey, header.value());
49+
}
50+
});
51+
}
52+
53+
private boolean isTenantHeader(String key) {
54+
return TENANT_ID.equals(key) || XOkapiHeaders.TENANT.equals(key);
55+
}
56+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package org.folio.search.model.event;
2+
3+
public record IndexInstanceEvent(String tenant, String instanceId) { }

0 commit comments

Comments
 (0)