Skip to content

Commit 57b4a5d

Browse files
refactor(instance-sharing): address review comments
- Use KafkaTemplate<String, Object> to support multiple event types - Configure Kafka JsonSerializer in @SpringBootTest to fix serialization issues - Remove unused metadata field from InstanceSharingCompleteEvent - Use single-message listener for instance sharing events
1 parent f92e0dc commit 57b4a5d

23 files changed

+134
-187
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
293293
| KAFKA_AUTHORITIES_CONCURRENCY | 1 | Custom number of kafka concurrent threads for authority message consuming. |
294294
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location, inventory.campus, inventory.institution and inventory.library message consuming. |
295295
| KAFKA_LINKED_DATA_CONCURRENCY | 1 | Custom number of kafka concurrent threads for linked data message consuming. |
296-
| KAFKA_INSTANCE_SHARING_COMPLETE_CONCURRENCY | 1 | Custom number of kafka concurrent threads for instance sharing complete message consuming. |
296+
| KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY | 1 | Custom number of kafka concurrent threads for consortium instance sharing complete message consuming. |
297297
| KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY | 1 | Custom number of kafka concurrent threads for `search.reindex.range-index` message consuming. |
298298
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS | 16 | Amount of partitions for `search.reindex.range-index` topic. |
299299
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.reindex.range-index` topic. |

descriptors/ModuleDescriptor-template.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,9 +1045,9 @@
10451045
"description": "Custom number of kafka concurrent threads for authority message consuming."
10461046
},
10471047
{
1048-
"name": "KAFKA_INSTANCE_SHARING_COMPLETE_CONCURRENCY",
1048+
"name": "KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY",
10491049
"value": "1",
1050-
"description": "Custom number of kafka concurrent threads for instance sharing complete message consuming."
1050+
"description": "Custom number of kafka concurrent threads for consortium instance sharing complete message consuming."
10511051
},
10521052
{
10531053
"name": "KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY",
Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package org.folio.search.configuration.kafka;
22

3-
import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;
4-
5-
import java.util.Map;
3+
import java.util.Collections;
64
import lombok.RequiredArgsConstructor;
75
import org.folio.search.model.event.InstanceSharingCompleteEvent;
86
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
97
import org.springframework.context.annotation.Bean;
108
import org.springframework.context.annotation.Configuration;
119
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
10+
import org.springframework.kafka.listener.CommonErrorHandler;
1211
import org.springframework.kafka.support.serializer.JacksonJsonDeserializer;
1312

1413
@Configuration
@@ -19,12 +18,12 @@ public class InstanceSharingCompleteEventKafkaConfiguration extends KafkaConfigu
1918

2019
@Bean
2120
public ConcurrentKafkaListenerContainerFactory<String, InstanceSharingCompleteEvent>
22-
instanceSharingCompletedListenerContainerFactory() {
21+
instanceSharingCompleteListenerContainerFactory(CommonErrorHandler commonErrorHandler) {
2322

2423
var factory = new ConcurrentKafkaListenerContainerFactory<String, InstanceSharingCompleteEvent>();
2524
var deserializer = new JacksonJsonDeserializer<>(InstanceSharingCompleteEvent.class, false);
26-
var overrideProperties = Map.<String, Object>of(MAX_POLL_RECORDS_CONFIG, 10);
27-
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, overrideProperties));
25+
factory.setConsumerFactory(getConsumerFactory(deserializer, kafkaProperties, Collections.emptyMap()));
26+
factory.setCommonErrorHandler(commonErrorHandler);
2827
return factory;
2928
}
3029
}

src/main/java/org/folio/search/integration/message/KafkaMessageListener.java

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;
99

1010
import java.util.List;
11-
import java.util.Map;
1211
import java.util.function.Consumer;
1312
import java.util.function.Predicate;
1413
import java.util.stream.Collectors;
@@ -181,40 +180,26 @@ public void handleLinkedDataEvents(List<ConsumerRecord<String, ResourceEvent>> c
181180

182181
@KafkaListener(
183182
id = KafkaConstants.INSTANCE_SHARING_COMPLETE_LISTENER_ID,
184-
containerFactory = "instanceSharingCompletedListenerContainerFactory",
183+
containerFactory = "instanceSharingCompleteListenerContainerFactory",
185184
groupId = "#{folioKafkaProperties.listener['instance-sharing-complete'].groupId}",
186185
concurrency = "#{folioKafkaProperties.listener['instance-sharing-complete'].concurrency}",
187186
topicPattern = "#{folioKafkaProperties.listener['instance-sharing-complete'].topicPattern}")
188-
public void handleInstanceSharingCompleteEvents(List<InstanceSharingCompleteEvent> instanceSharingCompleteEvents) {
189-
log.info("Processing instance sharing complete events from Kafka [number of events: {}]",
190-
instanceSharingCompleteEvents.size());
191-
192-
var batch = instanceSharingCompleteEvents.stream()
193-
.filter(event -> InstanceSharingCompleteEvent.Status.COMPLETE.equals(event.getStatus())
194-
&& StringUtils.isEmpty(event.getError()))
195-
.toList();
196-
197-
var batchByTenant = batch.stream()
198-
.collect(Collectors.groupingBy(
199-
InstanceSharingCompleteEvent::getTargetTenantId,
200-
Collectors.mapping(InstanceSharingCompleteEvent::getInstanceIdentifier, Collectors.toList())
201-
));
202-
updateCalNumbersLastUpdatedDate(batch, batchByTenant);
203-
}
204-
205-
private void updateCalNumbersLastUpdatedDate(List<InstanceSharingCompleteEvent> batch,
206-
Map<String, List<String>> batchByTenant) {
207-
batchByTenant.forEach((tenant, instanceIdentifiers) -> executionService.executeSystemUserScoped(tenant, () -> {
208-
folioMessageBatchProcessor.consumeBatchWithFallback(batch, KAFKA_RETRY_TEMPLATE_NAME,
209-
event -> {
210-
if (consortiumTenantProvider.isCentralTenant(tenant)) {
211-
log.info("updateCalNumbersLastUpdatedDate: Updating lastUpdatedDate for call numbers of {} instances "
212-
+ "in central tenant {}", instanceIdentifiers.size(), tenant);
213-
callNumberRepository.updateLastUpdatedDate(instanceIdentifiers);
214-
}
215-
}, KafkaMessageListener::logFailedEvent);
216-
return null;
217-
}));
187+
public void handleInstanceSharingCompleteEvent(InstanceSharingCompleteEvent instanceSharingCompleteEvent) {
188+
log.info("Processing consortium instance sharing complete event from Kafka ");
189+
190+
if (InstanceSharingCompleteEvent.Status.COMPLETE.equals(instanceSharingCompleteEvent.getStatus())
191+
&& StringUtils.isEmpty(instanceSharingCompleteEvent.getError())) {
192+
193+
var tenant = instanceSharingCompleteEvent.getTargetTenantId();
194+
executionService.executeSystemUserScoped(tenant, () -> {
195+
if (consortiumTenantProvider.isCentralTenant(tenant)) {
196+
log.info("handleInstanceSharingCompleteEvent: Updating lastUpdatedDate for call numbers of instance "
197+
+ "in central tenant {}", tenant);
198+
callNumberRepository.updateLastUpdatedDate(instanceSharingCompleteEvent.getInstanceIdentifier());
199+
}
200+
return null;
201+
});
202+
}
218203
}
219204

220205
private void indexResources(List<ResourceEvent> batch, Consumer<List<ResourceEvent>> indexConsumer) {
@@ -252,15 +237,4 @@ private static void logFailedEvent(IndexInstanceEvent event, Exception e) {
252237
event.tenant(), event.instanceId()
253238
), e);
254239
}
255-
256-
private static void logFailedEvent(InstanceSharingCompleteEvent event, Exception e) {
257-
if (event == null) {
258-
log.warn("Failed to update tenantId for instance sharing complete event [event: null]", e);
259-
return;
260-
}
261-
log.warn(new FormattedMessage(
262-
"Failed to update tenantId for instance sharing complete event [targetTenantId: {}, instanceId: {}]",
263-
event.getTargetTenantId(), event.getInstanceIdentifier()
264-
), e);
265-
}
266240
}

src/main/java/org/folio/search/model/event/InstanceSharingCompleteEvent.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ public class InstanceSharingCompleteEvent {
1414
private String targetTenantId;
1515
private Status status;
1616
private String error;
17-
private Object metadata;
1817

1918
@Getter
2019
@RequiredArgsConstructor

src/main/java/org/folio/search/service/reindex/jdbc/CallNumberRepository.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ WHERE item_id IN (%2$s) %3$s
8080
UPDATE %1$s.call_number cn
8181
SET last_updated_date = CURRENT_TIMESTAMP
8282
FROM %1$s.instance_call_number icn
83-
WHERE icn.instance_id IN (%2$s)
83+
WHERE icn.instance_id = ?::uuid
8484
AND cn.id = icn.call_number_id;
8585
""";
8686

@@ -185,16 +185,14 @@ public void deleteByInstanceIds(List<String> itemIds, String tenantId) {
185185
}
186186

187187
/**
188-
* Updates last_updated_date in call_number records for the given instances to trigger reindexing
188+
* Updates last_updated_date in call_number records for the given instance to trigger reindexing
189189
* of those call numbers when an instance is shared to the central tenant .
190190
*
191-
* @param instanceIds list of instance IDs whose call number relations should be updated
191+
* @param instanceId instance ID whose call number relations should be updated
192192
*/
193-
@SuppressWarnings("java:S2077")
194-
public void updateLastUpdatedDate(List<String> instanceIds) {
195-
var sql = UPDATE_LAST_UPDATED_DATE_QUERY.formatted(JdbcUtils.getSchemaName(context),
196-
getParamPlaceholderForUuid(instanceIds.size()));
197-
jdbcTemplate.update(sql, instanceIds.toArray());
193+
public void updateLastUpdatedDate(String instanceId) {
194+
var sql = UPDATE_LAST_UPDATED_DATE_QUERY.formatted(JdbcUtils.getSchemaName(context));
195+
jdbcTemplate.update(sql, instanceId);
198196
}
199197

200198
@Override

src/main/resources/application-dev.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,9 @@ folio:
151151
topic-pattern: (${folio.environment}\.)(.*\.)linked-data\.(instance|work|hub)
152152
group-id: ${folio.environment}-mod-search-linked-data-group
153153
instance-sharing-complete:
154-
concurrency: ${KAFKA_INSTANCE_SHARING_COMPLETE_CONCURRENCY:1}
154+
concurrency: ${KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY:1}
155155
topic-pattern: (${folio.environment}\.)[A-Za-z0-9-]+\.\w+\.CONSORTIUM_INSTANCE_SHARING_COMPLETE
156-
group-id: ${folio.environment}-mod-search-instance-sharing-complete-group
156+
group-id: ${folio.environment}-mod-search-consortium-instance-sharing-complete-group
157157
reindex-range-index:
158158
concurrency: ${KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY:2}
159159
topic-pattern: (${folio.environment}\.)(.*\.)search\.reindex\.range-index

src/main/resources/application.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ folio:
149149
topic-pattern: (${folio.environment}\.)(.*\.)linked-data\.(instance|work|hub)
150150
group-id: ${folio.environment}-mod-search-linked-data-group
151151
instance-sharing-complete:
152-
concurrency: ${KAFKA_INSTANCE_SHARING_COMPLETE_CONCURRENCY:1}
152+
concurrency: ${KAFKA_CONSORTIUM_INSTANCE_SHARING_COMPLETE_CONCURRENCY:1}
153153
topic-pattern: (${folio.environment}\.)[A-Za-z0-9-]+\.\w+\.CONSORTIUM_INSTANCE_SHARING_COMPLETE
154-
group-id: ${folio.environment}-mod-search-instance-sharing-complete-group
154+
group-id: ${folio.environment}-mod-search-consortium-instance-sharing-complete-group
155155
reindex-range-index:
156156
concurrency: ${KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY:2}
157157
topic-pattern: (${folio.environment}\.)(.*\.)search\.reindex\.range-index

src/test/java/org/folio/api/config/ConfigIT.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -367,21 +367,19 @@ void referenceDataCacheInvalidates_whenEventReceived(String topic, String resour
367367
referenceDataCache.put(cacheKey, UUID.randomUUID());
368368
assertThat(referenceDataCache.get(cacheKey)).isNotNull();
369369

370-
kafkaTemplate.send(getTopicName(topic), randomId(),
371-
objectMapper.writeValueAsString(new ResourceEvent().resourceName(resource)));
370+
kafkaTemplate.send(getTopicName(topic), randomId(), new ResourceEvent().resourceName(resource));
372371

373372
await().atMost(ONE_MINUTE).pollInterval(TWO_SECONDS)
374373
.untilAsserted(() -> assertThat(referenceDataCache.get(cacheKey)).isNull());
375374
}
376375

377376
private void sendDeleteEvent(UUID typeId, String topic, ResourceType resourceType) {
378-
kafkaTemplate.send(topic, typeId.toString(),
379-
objectMapper.writeValueAsString(new ResourceEvent()
380-
.type(ResourceEventType.DELETE)
381-
.tenant(TENANT_ID)
382-
.resourceName(resourceType.getName())
383-
.old(mapOf(ID_FIELD, typeId.toString()))
384-
));
377+
kafkaTemplate.send(topic, typeId.toString(), new ResourceEvent()
378+
.type(ResourceEventType.DELETE)
379+
.tenant(TENANT_ID)
380+
.resourceName(resourceType.getName())
381+
.old(mapOf(ID_FIELD, typeId.toString()))
382+
);
385383
}
386384

387385
@SneakyThrows

src/test/java/org/folio/api/consortiumsearch/ConsortiumSearchCampusesIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,7 @@ private static void saveCampusRecords() {
127127
.flatMap(campus -> Stream.of(
128128
kafkaResourceEvent(CENTRAL_TENANT_ID, CREATE, campus, null),
129129
kafkaResourceEvent(MEMBER_TENANT_ID, CREATE, campus, null)))
130-
.forEach(event -> kafkaTemplate.send(inventoryCampusTopic(event.getTenant()),
131-
objectMapper.writeValueAsString(event)));
130+
.forEach(event -> kafkaTemplate.send(inventoryCampusTopic(event.getTenant()), event));
132131

133132
await().atMost(ONE_MINUTE).pollInterval(ONE_SECOND).untilAsserted(() -> {
134133
var totalHits = countIndexDocument(CAMPUS, CENTRAL_TENANT_ID);

0 commit comments

Comments
 (0)