Skip to content

Commit 624c02e

Browse files
committed
Merge branch 'master' into feature/ecs-member-reindex
# Conflicts: # src/main/java/org/folio/search/service/reindex/jdbc/CallNumberRepository.java # src/test/java/org/folio/search/service/reindex/jdbc/CallNumberRepositoryIT.java
2 parents 56d7a3f + a8db760 commit 624c02e

31 files changed

+367
-188
lines changed

NEWS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
* Use unique consumer group per instance for inventory classification-type and call-number-type topics ([MSEARCH-1092](https://folio-org.atlassian.net/browse/MSEARCH-1092))
2424
* Remove identifiers from keyword alias for instance and authority mappings ([MSEARCH-1118](https://folio-org.atlassian.net/browse/MSEARCH-1118))
2525
* Change `all` query builder to use full term for multimatch search ([MSEARCH-1112](https://folio-org.atlassian.net/browse/MSEARCH-1112))
26+
* Create a new listener for the CONSORTIUM_INSTANCE_SHARING_COMPLETE event to update the call number’s last_updated_date when an instance is shared to the central tenant ([MSEARCH-1168](https://folio-org.atlassian.net/browse/MSEARCH-1168))
2627
* **Authority Search**
2728
* Implement two-stage Kafka processing with event aggregation for instance indexing ([MSEARCH-1157](https://folio-org.atlassian.net/browse/MSEARCH-1157))
2829
* Separate LCCN and Canceled LCCN identifiers search to lccn and canceledLccn options ([MSEARCH-1066](https://folio-org.atlassian.net/browse/MSEARCH-1066))
30+
* Use Authority-Specific Identifier Types for LCCN Lookups ([MSEARCH-1193](https://folio-org.atlassian.net/browse/MSEARCH-1193))
2931
* **Classification Browse**
3032
* Add title and contributors to classification browse response ([MSEARCH-1045](https://folio-org.atlassian.net/browse/MSEARCH-1045))
3133
* Add id to classification browse response ([MSEARCH-1093](https://folio-org.atlassian.net/browse/MSEARCH-1093))

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
293293
| KAFKA_AUTHORITIES_CONCURRENCY | 1 | Custom number of kafka concurrent threads for authority message consuming. |
294294
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location, inventory.campus, inventory.institution and inventory.library message consuming. |
295295
| KAFKA_LINKED_DATA_CONCURRENCY | 1 | Custom number of kafka concurrent threads for linked data message consuming. |
296+
| KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY | 1 | Custom number of kafka concurrent threads for consortium instance sharing complete message consuming. |
296297
| KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY | 1 | Custom number of kafka concurrent threads for `search.reindex.range-index` message consuming. |
297298
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS | 16 | Amount of partitions for `search.reindex.range-index` topic. |
298299
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.reindex.range-index` topic. |

descriptors/ModuleDescriptor-template.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,10 @@
652652
"id": "item-storage",
653653
"version": "11.0"
654654
},
655+
{
656+
"id": "authority-identifier-types",
657+
"version": "1.0"
658+
},
655659
{
656660
"id": "authority-reindex",
657661
"version": "0.1"
@@ -932,6 +936,7 @@
932936
"perms.users.item.post",
933937
"perms.users.assign.immutable",
934938
"perms.users.assign.mutable",
939+
"authority-identifier-types.collection.get",
935940
"inventory-storage.instance.reindex.post",
936941
"authority-storage.authority.reindex.post",
937942
"inventory-storage.inventory-view.instances.collection.get",
@@ -1039,6 +1044,11 @@
10391044
"value": "1",
10401045
"description": "Custom number of kafka concurrent threads for authority message consuming."
10411046
},
1047+
{
1048+
"name": "KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY",
1049+
"value": "1",
1050+
"description": "Custom number of kafka concurrent threads for consortium instance sharing complete message consuming."
1051+
},
10421052
{
10431053
"name": "KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY",
10441054
"value": "1",

docker/kafka-init.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ TOPICS=(
2626
"${ENV}.Default.linked-data.hub"
2727
"${ENV}.Default.search.reindex.range-index"
2828
"${ENV}.Default.inventory.reindex-records"
29+
"${ENV}.Default.ALL.CONSORTIUM_INSTANCE_SHARING_COMPLETE"
2930
)
3031

3132
# Updated to use the full path to kafka-topics.sh

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.springframework.boot</groupId>
99
<artifactId>spring-boot-starter-parent</artifactId>
10-
<version>4.0.3</version>
10+
<version>4.0.4</version>
1111
<relativePath/>
1212
</parent>
1313

@@ -55,7 +55,7 @@
5555

5656
<!-- Plugins versions -->
5757
<folio-module-descriptor-validator.version>1.0.1</folio-module-descriptor-validator.version>
58-
<maven-openapi-generator-plugin.version>7.20.0</maven-openapi-generator-plugin.version>
58+
<maven-openapi-generator-plugin.version>7.21.0</maven-openapi-generator-plugin.version>
5959
<maven-copy-rename-plugin.version>1.0.1</maven-copy-rename-plugin.version>
6060
<maven-build-helper-plugin.version>3.6.1</maven-build-helper-plugin.version>
6161
<maven-clean-plugin.version>3.5.0</maven-clean-plugin.version>
@@ -313,7 +313,7 @@
313313
<dependency>
314314
<groupId>org.testcontainers</groupId>
315315
<artifactId>testcontainers-elasticsearch</artifactId>
316-
<version>2.0.3</version>
316+
<version>2.0.4</version>
317317
<scope>test</scope>
318318
</dependency>
319319

src/main/java/org/folio/search/client/InventoryReferenceDataClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public interface InventoryReferenceDataClient {
2727
@Getter
2828
enum ReferenceDataType {
2929

30+
AUTHORITY_IDENTIFIER_TYPES("authority-identifier-types"),
3031
IDENTIFIER_TYPES("identifier-types"),
3132
ALTERNATIVE_TITLE_TYPES("alternative-title-types"),
3233
CALL_NUMBER_TYPES("call-number-types"),
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.folio.search.configuration.kafka;
2+
3+
import java.util.Collections;
4+
import lombok.RequiredArgsConstructor;
5+
import org.folio.search.model.event.InstanceSharingCompleteEvent;
6+
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
7+
import org.springframework.context.annotation.Bean;
8+
import org.springframework.context.annotation.Configuration;
9+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
10+
import org.springframework.kafka.listener.CommonErrorHandler;
11+
import org.springframework.kafka.support.serializer.JacksonJsonDeserializer;
12+
13+
@Configuration
14+
@RequiredArgsConstructor
15+
public class InstanceSharingCompleteEventKafkaConfiguration extends KafkaConfiguration {
16+
17+
private final KafkaProperties kafkaProperties;
18+
19+
@Bean
20+
public ConcurrentKafkaListenerContainerFactory<String, InstanceSharingCompleteEvent>
21+
instanceSharingCompleteListenerContainerFactory(CommonErrorHandler commonErrorHandler) {
22+
23+
var factory = new ConcurrentKafkaListenerContainerFactory<String, InstanceSharingCompleteEvent>();
24+
var deserializer = new JacksonJsonDeserializer<>(InstanceSharingCompleteEvent.class, false);
25+
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, Collections.emptyMap()));
26+
factory.setCommonErrorHandler(commonErrorHandler);
27+
return factory;
28+
}
29+
}

src/main/java/org/folio/search/integration/message/KafkaMessageListener.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@
1313
import java.util.stream.Collectors;
1414
import lombok.RequiredArgsConstructor;
1515
import lombok.extern.log4j.Log4j2;
16+
import org.apache.commons.lang3.StringUtils;
1617
import org.apache.commons.lang3.Strings;
1718
import org.apache.kafka.clients.consumer.ConsumerRecord;
1819
import org.apache.logging.log4j.message.FormattedMessage;
1920
import org.folio.search.domain.dto.ResourceEvent;
2021
import org.folio.search.model.event.IndexInstanceEvent;
22+
import org.folio.search.model.event.InstanceSharingCompleteEvent;
2123
import org.folio.search.model.types.ResourceType;
2224
import org.folio.search.service.ResourceService;
2325
import org.folio.search.service.config.ConfigSynchronizationService;
26+
import org.folio.search.service.consortium.ConsortiumTenantProvider;
27+
import org.folio.search.service.reindex.jdbc.CallNumberRepository;
2428
import org.folio.search.utils.KafkaConstants;
2529
import org.folio.search.utils.SearchConverterUtils;
2630
import org.folio.spring.service.SystemUserScopedExecutionService;
@@ -43,6 +47,8 @@ public class KafkaMessageListener {
4347
private final ConfigSynchronizationService configSynchronizationService;
4448
private final KafkaTemplate<String, IndexInstanceEvent> instanceEventProducer;
4549
private final InstanceEventMapper instanceEventMapper;
50+
private final CallNumberRepository callNumberRepository;
51+
private final ConsortiumTenantProvider consortiumTenantProvider;
4652

4753
/**
4854
* Handles instance events and indexes them by id.
@@ -172,6 +178,30 @@ public void handleLinkedDataEvents(List<ConsumerRecord<String, ResourceEvent>> c
172178
indexResources(batch, resourceService::indexResources);
173179
}
174180

181+
@KafkaListener(
182+
id = KafkaConstants.INSTANCE_SHARING_COMPLETE_LISTENER_ID,
183+
containerFactory = "instanceSharingCompleteListenerContainerFactory",
184+
groupId = "#{folioKafkaProperties.listener['instance-sharing-complete'].groupId}",
185+
concurrency = "#{folioKafkaProperties.listener['instance-sharing-complete'].concurrency}",
186+
topicPattern = "#{folioKafkaProperties.listener['instance-sharing-complete'].topicPattern}")
187+
public void handleInstanceSharingCompleteEvent(InstanceSharingCompleteEvent instanceSharingCompleteEvent) {
188+
log.info("Processing consortium instance sharing complete event from Kafka ");
189+
190+
if (InstanceSharingCompleteEvent.Status.COMPLETE.equals(instanceSharingCompleteEvent.getStatus())
191+
&& StringUtils.isEmpty(instanceSharingCompleteEvent.getError())) {
192+
193+
var tenant = instanceSharingCompleteEvent.getTargetTenantId();
194+
executionService.executeSystemUserScoped(tenant, () -> {
195+
if (consortiumTenantProvider.isCentralTenant(tenant)) {
196+
log.info("handleInstanceSharingCompleteEvent: Updating lastUpdatedDate for call numbers of instance "
197+
+ "in central tenant {}", tenant);
198+
callNumberRepository.updateLastUpdatedDate(instanceSharingCompleteEvent.getInstanceIdentifier());
199+
}
200+
return null;
201+
});
202+
}
203+
}
204+
175205
private void indexResources(List<ResourceEvent> batch, Consumer<List<ResourceEvent>> indexConsumer) {
176206
var batchByTenant = batch.stream().collect(Collectors.groupingBy(ResourceEvent::getTenant));
177207

src/main/java/org/folio/search/model/client/CqlQueryParam.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
public enum CqlQueryParam {
88

99
ID("id"),
10+
CODE("code"),
1011
NAME("name"),
1112
SOURCE("source"),
1213
HOLDINGS_ID("holdings.id");
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.folio.search.model.event;
2+
3+
import com.fasterxml.jackson.annotation.JsonValue;
4+
import java.util.UUID;
5+
import lombok.Data;
6+
import lombok.Getter;
7+
import lombok.RequiredArgsConstructor;
8+
9+
@Data
10+
public class InstanceSharingCompleteEvent {
11+
private UUID id;
12+
private String instanceIdentifier;
13+
private String sourceTenantId;
14+
private String targetTenantId;
15+
private Status status;
16+
private String error;
17+
18+
@Getter
19+
@RequiredArgsConstructor
20+
public enum Status {
21+
22+
IN_PROGRESS("IN_PROGRESS"),
23+
COMPLETE("COMPLETE"),
24+
ERROR("ERROR");
25+
26+
@JsonValue
27+
private final String value;
28+
}
29+
}

0 commit comments

Comments
 (0)