Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Use unique consumer group per instance for inventory classification-type and call-number-type topics ([MSEARCH-1092](https://folio-org.atlassian.net/browse/MSEARCH-1092))
* Remove identifiers from keyword alias for instance and authority mappings ([MSEARCH-1118](https://folio-org.atlassian.net/browse/MSEARCH-1118))
* Change `all` query builder to use full term for multimatch search ([MSEARCH-1112](https://folio-org.atlassian.net/browse/MSEARCH-1112))
* Create a new listener for the CONSORTIUM_INSTANCE_SHARING_COMPLETE event to update the call number’s last_updated_date when an instance is shared to the central tenant ([MSEARCH-1168](https://folio-org.atlassian.net/browse/MSEARCH-1168))
* **Authority Search**
* Implement two-stage Kafka processing with event aggregation for instance indexing ([MSEARCH-1157](https://folio-org.atlassian.net/browse/MSEARCH-1157))
* Separate LCCN and Canceled LCCN identifiers search to lccn and canceledLccn options ([MSEARCH-1066](https://folio-org.atlassian.net/browse/MSEARCH-1066))
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
| KAFKA_AUTHORITIES_CONCURRENCY | 1 | Custom number of kafka concurrent threads for authority message consuming. |
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location, inventory.campus, inventory.institution and inventory.library message consuming. |
| KAFKA_LINKED_DATA_CONCURRENCY | 1 | Custom number of kafka concurrent threads for linked data message consuming. |
| KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY | 1 | Custom number of kafka concurrent threads for consortium instance sharing complete message consuming. |
| KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY | 1 | Custom number of kafka concurrent threads for `search.reindex.range-index` message consuming. |
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS | 16 | Amount of partitions for `search.reindex.range-index` topic. |
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.reindex.range-index` topic. |
Expand Down
5 changes: 5 additions & 0 deletions descriptors/ModuleDescriptor-template.json
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,11 @@
"value": "1",
"description": "Custom number of kafka concurrent threads for authority message consuming."
},
{
"name": "KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY",
"value": "1",
"description": "Custom number of kafka concurrent threads for consortium instance sharing complete message consuming."
},
{
"name": "KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY",
"value": "1",
Expand Down
1 change: 1 addition & 0 deletions docker/kafka-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ TOPICS=(
"${ENV}.Default.linked-data.hub"
"${ENV}.Default.search.reindex.range-index"
"${ENV}.Default.inventory.reindex-records"
"${ENV}.Default.ALL.CONSORTIUM_INSTANCE_SHARING_COMPLETE"
)

# Updated to use the full path to kafka-topics.sh
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.folio.search.configuration.kafka;

import java.util.Collections;
import lombok.RequiredArgsConstructor;
import org.folio.search.model.event.InstanceSharingCompleteEvent;
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.support.serializer.JacksonJsonDeserializer;

@Configuration
@RequiredArgsConstructor
public class InstanceSharingCompleteEventKafkaConfiguration extends KafkaConfiguration {

private final KafkaProperties kafkaProperties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, InstanceSharingCompleteEvent>
instanceSharingCompleteListenerContainerFactory(CommonErrorHandler commonErrorHandler) {

var factory = new ConcurrentKafkaListenerContainerFactory<String, InstanceSharingCompleteEvent>();
var deserializer = new JacksonJsonDeserializer<>(InstanceSharingCompleteEvent.class, false);
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, Collections.emptyMap()));
factory.setCommonErrorHandler(commonErrorHandler);
return factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Strings;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.logging.log4j.message.FormattedMessage;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.model.event.IndexInstanceEvent;
import org.folio.search.model.event.InstanceSharingCompleteEvent;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.ResourceService;
import org.folio.search.service.config.ConfigSynchronizationService;
import org.folio.search.service.consortium.ConsortiumTenantProvider;
import org.folio.search.service.reindex.jdbc.CallNumberRepository;
import org.folio.search.utils.KafkaConstants;
import org.folio.search.utils.SearchConverterUtils;
import org.folio.spring.service.SystemUserScopedExecutionService;
Expand All @@ -43,6 +47,8 @@ public class KafkaMessageListener {
private final ConfigSynchronizationService configSynchronizationService;
private final KafkaTemplate<String, IndexInstanceEvent> instanceEventProducer;
private final InstanceEventMapper instanceEventMapper;
private final CallNumberRepository callNumberRepository;
private final ConsortiumTenantProvider consortiumTenantProvider;

/**
* Handles instance events and indexes them by id.
Expand Down Expand Up @@ -172,6 +178,30 @@ public void handleLinkedDataEvents(List<ConsumerRecord<String, ResourceEvent>> c
indexResources(batch, resourceService::indexResources);
}

@KafkaListener(
id = KafkaConstants.INSTANCE_SHARING_COMPLETE_LISTENER_ID,
containerFactory = "instanceSharingCompleteListenerContainerFactory",
groupId = "#{folioKafkaProperties.listener['instance-sharing-complete'].groupId}",
concurrency = "#{folioKafkaProperties.listener['instance-sharing-complete'].concurrency}",
topicPattern = "#{folioKafkaProperties.listener['instance-sharing-complete'].topicPattern}")
public void handleInstanceSharingCompleteEvent(InstanceSharingCompleteEvent instanceSharingCompleteEvent) {
log.info("Processing consortium instance sharing complete event from Kafka ");

if (InstanceSharingCompleteEvent.Status.COMPLETE.equals(instanceSharingCompleteEvent.getStatus())
&& StringUtils.isEmpty(instanceSharingCompleteEvent.getError())) {

var tenant = instanceSharingCompleteEvent.getTargetTenantId();
executionService.executeSystemUserScoped(tenant, () -> {
if (consortiumTenantProvider.isCentralTenant(tenant)) {
log.info("handleInstanceSharingCompleteEvent: Updating lastUpdatedDate for call numbers of instance "
+ "in central tenant {}", tenant);
callNumberRepository.updateLastUpdatedDate(instanceSharingCompleteEvent.getInstanceIdentifier());
}
return null;
});
}
}

private void indexResources(List<ResourceEvent> batch, Consumer<List<ResourceEvent>> indexConsumer) {
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.folio.search.model.event;

import com.fasterxml.jackson.annotation.JsonValue;
import java.util.UUID;
import lombok.Data;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Data
public class InstanceSharingCompleteEvent {
private UUID id;
private String instanceIdentifier;
private String sourceTenantId;
private String targetTenantId;
private Status status;
private String error;

@Getter
@RequiredArgsConstructor
public enum Status {

IN_PROGRESS("IN_PROGRESS"),
COMPLETE("COMPLETE"),
ERROR("ERROR");

@JsonValue
private final String value;
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package org.folio.search.service;

import static org.folio.search.utils.SearchConverterUtils.getMapValueByPath;
import static org.folio.search.utils.SearchConverterUtils.getNewAsMap;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.extern.log4j.Log4j2;
import org.folio.search.domain.dto.ResourceEvent;
import org.folio.search.domain.dto.ResourceEventType;
import org.folio.search.model.types.ResourceType;
import org.folio.search.service.consortium.ConsortiumTenantProvider;
import org.folio.search.service.converter.preprocessor.extractor.ChildResourceExtractor;
import org.folio.search.service.reindex.jdbc.CallNumberRepository;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

Expand All @@ -28,15 +23,12 @@ public class InstanceChildrenResourceService {

private final Map<ResourceType, List<ChildResourceExtractor>> resourceExtractors;
private final ConsortiumTenantProvider consortiumTenantProvider;
private final CallNumberRepository callNumberRepository;

public InstanceChildrenResourceService(List<ChildResourceExtractor> resourceExtractors,
ConsortiumTenantProvider consortiumTenantProvider,
CallNumberRepository callNumberRepository) {
ConsortiumTenantProvider consortiumTenantProvider) {
this.resourceExtractors = resourceExtractors.stream()
.collect(Collectors.groupingBy(ChildResourceExtractor::resourceType));
this.consortiumTenantProvider = consortiumTenantProvider;
this.callNumberRepository = callNumberRepository;
}

public void persistChildren(String tenantId, ResourceType resourceType, List<ResourceEvent> events) {
Expand All @@ -50,37 +42,6 @@ public void persistChildren(String tenantId, ResourceType resourceType, List<Res
// Process child resources normally
extractors.forEach(resourceExtractor ->
resourceExtractor.persistChildren(tenantId, shared, events));

// When background job processes new instances in central tenant, update call numbers
// that may still be pointing to member tenant. Covers sharing instance case.
if (shared && resourceType == ResourceType.INSTANCE && !events.isEmpty()) {
var instanceIds = events.stream()
.filter(this::isNewInstance)
.map(ResourceEvent::getId)
.toList();
log.info("persistChildren: Updating call number tenant_id for {} instances in central tenant {}",
instanceIds.size(), tenantId);
callNumberRepository.updateTenantIdForCentralInstances(instanceIds, tenantId);
}
}

/**
* Checks if the instance is newly created by comparing metadata dates.
* An instance is considered new if its createdDate equals its updatedDate.
*
* @param event the resource event to check
* @return true if the instance is newly created, false otherwise
*/
private Boolean isNewInstance(ResourceEvent event) {
var instanceData = getNewAsMap(event);
if (instanceData.isEmpty()) {
return false;
}

var createdDate = getMapValueByPath("metadata.createdDate", instanceData);
var updatedDate = getMapValueByPath("metadata.updatedDate", instanceData);

return Objects.equals(createdDate, updatedDate);
}

public void persistChildrenOnReindex(String tenantId, ResourceType resourceType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,12 @@ WHERE item_id IN (%2$s) %3$s
WHERE id IN (SELECT * FROM deleted_ids);
""";

private static final String UPDATE_TENANT_FOR_CENTRAL_QUERY = """
WITH updated_call_numbers AS (
UPDATE %1$s.instance_call_number
SET tenant_id = ?
WHERE instance_id IN (%2$s)
AND tenant_id != ?
RETURNING call_number_id
)
UPDATE %1$s.call_number
private static final String UPDATE_LAST_UPDATED_DATE_QUERY = """
UPDATE %1$s.call_number cn
SET last_updated_date = CURRENT_TIMESTAMP
WHERE id IN (SELECT call_number_id FROM updated_call_numbers);
FROM %1$s.instance_call_number icn
WHERE icn.instance_id = ?::uuid
AND cn.id = icn.call_number_id;
""";

private static final String SELECT_BY_UPDATED_QUERY = """
Expand Down Expand Up @@ -190,22 +185,15 @@ public void deleteByInstanceIds(List<String> itemIds, String tenantId) {
}

/**
* Updates tenant_id in instance_call_number records for the given instances to the central tenant.
* Used when an instance is shared to the central tenant - the background job processes the newly created
* central instance and this method updates any existing call number relations that still point to the member tenant.
* For the updated relations - also updates last_updated_date in call_number table
* to trigger reindexing of those call numbers.
* Updates last_updated_date in call_number records for the given instance to trigger reindexing
* of those call numbers when an instance is shared to the central tenant .
*
* @param instanceIds list of instance IDs whose call number relations should be updated
* @param centralTenantId the central tenant ID to set
* @param instanceId instance ID whose call number relations should be updated
*/
public void updateTenantIdForCentralInstances(List<String> instanceIds, String centralTenantId) {
var sql = UPDATE_TENANT_FOR_CENTRAL_QUERY.formatted(JdbcUtils.getSchemaName(context),
getParamPlaceholderForUuid(instanceIds.size()));
var params = Stream.of(List.of(centralTenantId), instanceIds, List.of(centralTenantId))
.flatMap(List::stream)
.toArray();
jdbcTemplate.update(sql, params);
@SuppressWarnings("java:S2077")
public void updateLastUpdatedDate(String instanceId) {
var sql = UPDATE_LAST_UPDATED_DATE_QUERY.formatted(JdbcUtils.getSchemaName(context));
jdbcTemplate.update(sql, instanceId);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/folio/search/utils/KafkaConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class KafkaConstants {
public static final String BROWSE_CONFIG_DATA_LISTENER_ID = "mod-search-browse-config-data-listener";
public static final String LOCATION_LISTENER_ID = "mod-search-location-listener";
public static final String LINKED_DATA_LISTENER_ID = "mod-search-linked-data-listener";
public static final String INSTANCE_SHARING_COMPLETE_LISTENER_ID = "mod-search-instance-sharing-complete-listener";
public static final String REINDEX_RANGE_INDEX_LISTENER_ID = "mod-search-reindex-index-listener";
public static final String REINDEX_RECORDS_LISTENER_ID = "mod-search-reindex-records-listener";
}
4 changes: 4 additions & 0 deletions src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ folio:
concurrency: ${KAFKA_LINKED_DATA_CONCURRENCY:1}
topic-pattern: (${folio.environment}\.)(.*\.)linked-data\.(instance|work|hub)
group-id: ${folio.environment}-mod-search-linked-data-group
instance-sharing-complete:
concurrency: ${KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY:1}
topic-pattern: (${folio.environment}\.)[A-Za-z0-9-]+\.\w+\.CONSORTIUM_INSTANCE_SHARING_COMPLETE
group-id: ${folio.environment}-mod-search-consortium-instance-sharing-complete-group
reindex-range-index:
concurrency: ${KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY:2}
topic-pattern: (${folio.environment}\.)(.*\.)search\.reindex\.range-index
Expand Down
6 changes: 5 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ folio:
concurrency: ${KAFKA_LINKED_DATA_CONCURRENCY:1}
topic-pattern: (${folio.environment}\.)(.*\.)linked-data\.(instance|work|hub)
group-id: ${folio.environment}-mod-search-linked-data-group
instance-sharing-complete:
concurrency: ${KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY:1}
topic-pattern: (${folio.environment}\.)[A-Za-z0-9-]+\.\w+\.CONSORTIUM_INSTANCE_SHARING_COMPLETE
group-id: ${folio.environment}-mod-search-consortium-instance-sharing-complete-group
reindex-range-index:
concurrency: ${KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY:2}
topic-pattern: (${folio.environment}\.)(.*\.)search\.reindex\.range-index
Expand Down Expand Up @@ -193,4 +197,4 @@ logging:
org.apache.kafka.common.config.AbstractConfig: warn
org.apache.kafka.common.metrics: warn
org.apache.kafka.common.utils.AppInfoParser: warn
org.apache.kafka.common.telemetry: warn
org.apache.kafka.common.telemetry: warn
Loading
Loading