Skip to content

Commit 259e948

Browse files
feat(instance-sharing): create a new listener for the CONSORTIUM_INSTANCE_SHARING_COMPLETE event to update the call number’s last_updated_date when an instance is shared to the central tenant
1 parent 4b46958 commit 259e948

File tree

13 files changed

+38
-151
lines changed

13 files changed

+38
-151
lines changed

NEWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* Use unique consumer group per instance for inventory classification-type and call-number-type topics ([MSEARCH-1092](https://folio-org.atlassian.net/browse/MSEARCH-1092))
2323
* Remove identifiers from keyword alias for instance and authority mappings ([MSEARCH-1118](https://folio-org.atlassian.net/browse/MSEARCH-1118))
2424
* Change `all` query builder to use full term for multimatch search ([MSEARCH-1112](https://folio-org.atlassian.net/browse/MSEARCH-1112))
25-
* Create a new listener for the CONSORTIUM_INSTANCE_SHARING_COMPLETE event to update the call number’s tenantId instead of performing this update within InstanceChildrenResourceService.persistChildren ([MSEARCH-1168](https://folio-org.atlassian.net/browse/MSEARCH-1168))
25+
* Create a new listener for the CONSORTIUM_INSTANCE_SHARING_COMPLETE event to update the call number’s last_updated_date when an instance is shared to the central tenant ([MSEARCH-1168](https://folio-org.atlassian.net/browse/MSEARCH-1168))
2626
* **Authority Search**
2727
* Implement two-stage Kafka processing with event aggregation for instance indexing ([MSEARCH-1157](https://folio-org.atlassian.net/browse/MSEARCH-1157))
2828
* Separate LCCN and Canceled LCCN identifiers search to lccn and canceledLccn options ([MSEARCH-1066](https://folio-org.atlassian.net/browse/MSEARCH-1066))

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,6 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
336336
| INSTANCE_CHILDREN_INDEX_DELAY_MS | 60000 | Defines the delay for scheduler that indexes subjects/contributors/classifications/call-numbers in a background |
337337
| SUB_RESOURCE_BATCH_SIZE | 100 | Defines number of sub-resources to process at a time during background indexing |
338338
| STALE_LOCK_THRESHOLD_MS | 600000 | Threshold to consider a sub-resource lock as stale and eligible for release |
339-
| TASK_SCHEDULER_POOL_SIZE | 2 | The number of threads to keep in the task scheduler pool. |
340-
| TASK_SCHEDULER_DELAY_MS | 70000 | The delay in ms for scheduled tasks. |
341339

342340
The module uses system user to communicate with other modules from Kafka consumers.
343341
For production deployments you MUST specify the password for this system user via env variable:

descriptors/ModuleDescriptor-template.json

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,16 +1256,6 @@
12561256
"value": "500",
12571257
"description": "The capacity of the queue."
12581258
},
1259-
{
1260-
"name": "TASK_SCHEDULER_POOL_SIZE",
1261-
"value": "2",
1262-
"description": "The number of threads to keep in the task scheduler pool."
1263-
},
1264-
{
1265-
"name": "TASK_SCHEDULER_DELAY_MS",
1266-
"value": "70000",
1267-
"description": "The delay in ms for scheduled tasks."
1268-
},
12691259
{
12701260
"name": "SEARCH_QUERY_TIMEOUT",
12711261
"value": "25s",

src/main/java/org/folio/search/configuration/TaskSchedulerConfiguration.java

Lines changed: 0 additions & 21 deletions
This file was deleted.

src/main/java/org/folio/search/configuration/properties/TaskSchedulerProperties.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

src/main/java/org/folio/search/integration/message/KafkaMessageListener.java

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import static org.folio.search.utils.SearchConverterUtils.getResourceSource;
88
import static org.folio.search.utils.SearchUtils.SOURCE_CONSORTIUM_PREFIX;
99

10-
import java.time.Instant;
1110
import java.util.List;
1211
import java.util.Map;
1312
import java.util.function.Consumer;
@@ -19,7 +18,6 @@
1918
import org.apache.commons.lang3.Strings;
2019
import org.apache.kafka.clients.consumer.ConsumerRecord;
2120
import org.apache.logging.log4j.message.FormattedMessage;
22-
import org.folio.search.configuration.properties.TaskSchedulerProperties;
2321
import org.folio.search.domain.dto.ResourceEvent;
2422
import org.folio.search.model.event.IndexInstanceEvent;
2523
import org.folio.search.model.event.InstanceSharingCompleteEvent;
@@ -34,7 +32,6 @@
3432
import org.springframework.cache.annotation.CacheEvict;
3533
import org.springframework.kafka.annotation.KafkaListener;
3634
import org.springframework.kafka.core.KafkaTemplate;
37-
import org.springframework.scheduling.TaskScheduler;
3835
import org.springframework.stereotype.Component;
3936

4037
/**
@@ -53,8 +50,6 @@ public class KafkaMessageListener {
5350
private final InstanceEventMapper instanceEventMapper;
5451
private final CallNumberRepository callNumberRepository;
5552
private final ConsortiumTenantProvider consortiumTenantProvider;
56-
private final TaskSchedulerProperties taskSchedulerProperties;
57-
private final TaskScheduler instanceSharingCompleteTaskScheduler;
5853

5954
/**
6055
* Handles instance events and indexes them by id.
@@ -192,33 +187,30 @@ public void handleLinkedDataEvents(List<ConsumerRecord<String, ResourceEvent>> c
192187
topicPattern = "#{folioKafkaProperties.listener['instance-sharing-complete'].topicPattern}")
193188
public void handleInstanceSharingCompleteEvents(List<InstanceSharingCompleteEvent> instanceSharingCompleteEvents) {
194189
log.info("Processing instance sharing complete events from Kafka [number of events: {}]",
195-
instanceSharingCompleteEvents.size());
196-
// wait until instance children processing is completed and the call number records are available for update
197-
instanceSharingCompleteTaskScheduler.schedule(() -> {
198-
var batch = instanceSharingCompleteEvents.stream()
190+
instanceSharingCompleteEvents.size());
191+
192+
var batch = instanceSharingCompleteEvents.stream()
199193
.filter(event -> InstanceSharingCompleteEvent.Status.COMPLETE.equals(event.getStatus())
200-
&& StringUtils.isEmpty(event.getError()))
194+
&& StringUtils.isEmpty(event.getError()))
201195
.toList();
202196

203-
var batchByTenant = batch.stream()
197+
var batchByTenant = batch.stream()
204198
.collect(Collectors.groupingBy(
205-
InstanceSharingCompleteEvent::getTargetTenantId,
206-
Collectors.mapping(InstanceSharingCompleteEvent::getInstanceIdentifier, Collectors.toList())
199+
InstanceSharingCompleteEvent::getTargetTenantId,
200+
Collectors.mapping(InstanceSharingCompleteEvent::getInstanceIdentifier, Collectors.toList())
207201
));
208-
209-
updateTenantIdForCentralInstances(batch, batchByTenant);
210-
}, Instant.now().plusMillis(taskSchedulerProperties.getDelayMs()));
202+
updateCalNumbersLastUpdatedDate(batch, batchByTenant);
211203
}
212204

213-
private void updateTenantIdForCentralInstances(List<InstanceSharingCompleteEvent> batch,
214-
Map<String, List<String>> batchByTenant) {
205+
private void updateCalNumbersLastUpdatedDate(List<InstanceSharingCompleteEvent> batch,
206+
Map<String, List<String>> batchByTenant) {
215207
batchByTenant.forEach((tenant, instanceIdentifiers) -> executionService.executeSystemUserScoped(tenant, () -> {
216208
folioMessageBatchProcessor.consumeBatchWithFallback(batch, KAFKA_RETRY_TEMPLATE_NAME,
217209
event -> {
218210
if (consortiumTenantProvider.isCentralTenant(tenant)) {
219-
log.info("updateTenantIdForCentralInstances: Updating call number tenant_id for {} instances in "
220-
+ "central tenant {}", instanceIdentifiers.size(), tenant);
221-
callNumberRepository.updateTenantIdForCentralInstances(instanceIdentifiers, tenant);
211+
log.info("updateCalNumbersLastUpdatedDate: Updating lastUpdatedDate for call numbers of {} instances "
212+
+ "in central tenant {}", instanceIdentifiers.size(), tenant);
213+
callNumberRepository.updateLastUpdatedDate(instanceIdentifiers);
222214
}
223215
}, KafkaMessageListener::logFailedEvent);
224216
return null;

src/main/java/org/folio/search/service/reindex/jdbc/CallNumberRepository.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,12 @@ WHERE item_id IN (%2$s) %3$s
7676
WHERE id IN (SELECT * FROM deleted_ids);
7777
""";
7878

79-
private static final String UPDATE_TENANT_FOR_CENTRAL_QUERY = """
80-
WITH updated_call_numbers AS (
81-
UPDATE %1$s.instance_call_number
82-
SET tenant_id = ?
83-
WHERE instance_id IN (%2$s)
84-
AND tenant_id != ?
85-
RETURNING call_number_id
86-
)
87-
UPDATE %1$s.call_number
79+
private static final String UPDATE_LAST_UPDATED_DATE_QUERY = """
80+
UPDATE %1$s.call_number cn
8881
SET last_updated_date = CURRENT_TIMESTAMP
89-
WHERE id IN (SELECT call_number_id FROM updated_call_numbers);
82+
FROM %1$s.instance_call_number icn
83+
WHERE icn.instance_id IN (%2$s)
84+
AND cn.id = icn.call_number_id;
9085
""";
9186

9287
private static final String SELECT_BY_UPDATED_QUERY = """
@@ -190,22 +185,15 @@ public void deleteByInstanceIds(List<String> itemIds, String tenantId) {
190185
}
191186

192187
/**
193-
* Updates tenant_id in instance_call_number records for the given instances to the central tenant.
194-
* Used when an instance is shared to the central tenant - the background job processes the newly created
195-
* central instance and this method updates any existing call number relations that still point to the member tenant.
196-
* For the updated relations - also updates last_updated_date in call_number table
197-
* to trigger reindexing of those call numbers.
188+
* Updates last_updated_date in call_number records for the given instances to trigger reindexing
189+
* of those call numbers when an instance is shared to the central tenant .
198190
*
199-
* @param instanceIds list of instance IDs whose call number relations should be updated
200-
* @param centralTenantId the central tenant ID to set
191+
* @param instanceIds list of instance IDs whose call number relations should be updated
201192
*/
202-
public void updateTenantIdForCentralInstances(List<String> instanceIds, String centralTenantId) {
203-
var sql = UPDATE_TENANT_FOR_CENTRAL_QUERY.formatted(JdbcUtils.getSchemaName(context),
193+
public void updateLastUpdatedDate(List<String> instanceIds) {
194+
var sql = UPDATE_LAST_UPDATED_DATE_QUERY.formatted(JdbcUtils.getSchemaName(context),
204195
getParamPlaceholderForUuid(instanceIds.size()));
205-
var params = Stream.of(List.of(centralTenantId), instanceIds, List.of(centralTenantId))
206-
.flatMap(List::stream)
207-
.toArray();
208-
jdbcTemplate.update(sql, params);
196+
jdbcTemplate.update(sql, instanceIds.toArray());
209197
}
210198

211199
@Override

src/main/resources/application-dev.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,6 @@ folio:
112112
max-pool-size: ${STREAM_ID_MAX_POOL_SIZE:2}
113113
queue-capacity: ${STREAM_ID_QUEUE_CAPACITY:500}
114114
job-expiration-days: ${STREAM_ID_JOB_EXPIRATION_DAYS:7}
115-
task-scheduler:
116-
pool-size: ${TASK_SCHEDULER_POOL_SIZE:2}
117-
thread-name-prefix: ${folio.environment}-task-scheduler-
118-
delay-ms: ${TASK_SCHEDULER_DELAY_MS:70000}
119115
kafka:
120116
retry-interval-ms: ${KAFKA_RETRY_INTERVAL_MS:2000}
121117
retry-delivery-attempts: ${KAFKA_RETRY_DELIVERY_ATTEMPTS:6}

src/main/resources/application.yml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,6 @@ folio:
110110
max-pool-size: ${STREAM_ID_MAX_POOL_SIZE:2}
111111
queue-capacity: ${STREAM_ID_QUEUE_CAPACITY:500}
112112
job-expiration-days: ${STREAM_ID_JOB_EXPIRATION_DAYS:7}
113-
task-scheduler:
114-
pool-size: ${TASK_SCHEDULER_POOL_SIZE:2}
115-
thread-name-prefix: ${folio.environment}-task-scheduler-
116-
delay-ms: ${TASK_SCHEDULER_DELAY_MS:70000}
117113
kafka:
118114
retry-interval-ms: ${KAFKA_RETRY_INTERVAL_MS:2000}
119115
retry-delivery-attempts: ${KAFKA_RETRY_DELIVERY_ATTEMPTS:6}

src/test/java/org/folio/indexing/IndexingInstanceCallNumberConsortiumIT.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void tearDown() {
7777
void shouldUpdateInstanceCallNumber_onInstanceSharing() {
7878
// given
7979
createInstanceInMemberTenant(INSTANCE_ID, INSTANCE_TITLE, LOCATION_ID, CALL_NUMBER);
80-
awaitAssertion(() -> assertInstanceCallNumberTenantId(MEMBER_TENANT_ID, false));
80+
awaitAssertion(() -> assertInstanceCallNumberSharedState(false));
8181
// when - create instance in central tenant with the same instance id/title
8282
var centralInstance = new Instance().id(INSTANCE_ID).title(INSTANCE_TITLE).source("FOLIO");
8383
inventoryApi.createInstance(CENTRAL_TENANT_ID, centralInstance);
@@ -87,13 +87,13 @@ void shouldUpdateInstanceCallNumber_onInstanceSharing() {
8787
inventoryApi.updateInstance(MEMBER_TENANT_ID, memberInstance);
8888

8989
// then - fetch call number documents for the instance and check if tenant field changed to member tenant id
90-
awaitAssertion(() -> assertInstanceCallNumberTenantId(MEMBER_TENANT_ID, false));
90+
awaitAssertion(() -> assertInstanceCallNumberSharedState(false));
9191

9292
inventoryApi.shareInstance(CENTRAL_TENANT_ID, INSTANCE_ID, InstanceSharingCompleteEvent.Status.COMPLETE, "",
9393
MEMBER_TENANT_ID, CENTRAL_TENANT_ID);
9494

95-
// then - fetch call number documents and check if tenant field changed to central and shared is true
96-
awaitAssertion(() -> assertInstanceCallNumberTenantId(CENTRAL_TENANT_ID, true));
95+
// then - check that shared field is set to true
96+
awaitAssertion(() -> assertInstanceCallNumberSharedState(true));
9797
}
9898

9999
@ParameterizedTest(name = "{index} => status={0}, errorMessage={1}, targetTenant={2}")
@@ -107,25 +107,25 @@ void shouldNotUpdateInstanceCallNumber_onInvalidSharingEvent(
107107
// given
108108
createInstanceInMemberTenant(instanceId, title, locationId, callNumber);
109109

110-
awaitAssertion(() -> assertInstanceCallNumberTenantId(MEMBER_TENANT_ID, false));
110+
awaitAssertion(() -> assertInstanceCallNumberSharedState(false));
111111

112112
var centralInstance = new Instance().id(instanceId).title(title).source("FOLIO");
113113
inventoryApi.createInstance(CENTRAL_TENANT_ID, centralInstance);
114114

115115
var memberInstance = new Instance().id(instanceId).title(title).source("CONSORTIUM-FOLIO");
116116
inventoryApi.updateInstance(MEMBER_TENANT_ID, memberInstance);
117117

118-
awaitAssertion(() -> assertInstanceCallNumberTenantId(MEMBER_TENANT_ID, false));
118+
awaitAssertion(() -> assertInstanceCallNumberSharedState(false));
119119

120120
// when
121121
inventoryApi.shareInstance(MEMBER_TENANT_ID, instanceId, status, errorMessage, MEMBER_TENANT_ID, targetTenantId);
122122

123-
// then check that call number document is not updated with central tenant id and shared is false
123+
// then check that the shared field is not updated and remains false
124124
await()
125125
.pollDelay(Duration.ofSeconds(30))
126126
.atMost(ONE_MINUTE)
127127
.untilAsserted(() ->
128-
assertInstanceCallNumberTenantId(MEMBER_TENANT_ID, false)
128+
assertInstanceCallNumberSharedState(false)
129129
);
130130
}
131131

@@ -142,7 +142,7 @@ private static Stream<Arguments> negativeSharingScenarios() {
142142
UUID.randomUUID().toString(), "title3", UUID.randomUUID().toString(), "call number3"));
143143
}
144144

145-
private static void assertInstanceCallNumberTenantId(String expectedTenantId, boolean shared) {
145+
private static void assertInstanceCallNumberSharedState(boolean shared) {
146146
var hits = fetchAllDocuments(INSTANCE_CALL_NUMBER, CENTRAL_TENANT_ID);
147147
assertThat(hits).hasSize(1);
148148

@@ -152,7 +152,7 @@ private static void assertInstanceCallNumberTenantId(String expectedTenantId, bo
152152
assertThat(instances)
153153
.hasSize(1)
154154
.allSatisfy(map -> assertThat(map)
155-
.containsEntry("tenantId", expectedTenantId)
155+
.containsEntry("tenantId", MEMBER_TENANT_ID)
156156
.containsEntry("shared", shared));
157157
}
158158

0 commit comments

Comments
 (0)