Skip to content

Commit cf6ecb6

Browse files
add taskScheduler for handleInstanceSharingCompleteEvents listener
1 parent 418a03d commit cf6ecb6

File tree

12 files changed

+92
-20
lines changed

12 files changed

+92
-20
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
293293
| KAFKA_AUTHORITIES_CONCURRENCY | 1 | Custom number of kafka concurrent threads for authority message consuming. |
294294
| KAFKA_LOCATION_CONCURRENCY | 1 | Custom number of kafka concurrent threads for inventory.location, inventory.campus, inventory.institution and inventory.library message consuming. |
295295
| KAFKA_LINKED_DATA_CONCURRENCY | 1 | Custom number of kafka concurrent threads for linked data message consuming. |
296+
| KAFKA_INSTANCE_SHARING_COMPLETE_CONCURRENCY | 1 | Custom number of kafka concurrent threads for instance sharing complete message consuming. |
296297
| KAFKA_REINDEX_RANGE_INDEX_CONCURRENCY | 1 | Custom number of kafka concurrent threads for `search.reindex.range-index` message consuming. |
297298
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_PARTITIONS | 16 | Amount of partitions for `search.reindex.range-index` topic. |
298299
| KAFKA_REINDEX_RANGE_INDEX_TOPIC_REPLICATION_FACTOR | - | Replication factor for `search.reindex.range-index` topic. |
@@ -335,6 +336,8 @@ and [Cross-cluster replication](https://docs.aws.amazon.com/opensearch-service/l
335336
| INSTANCE_CHILDREN_INDEX_DELAY_MS | 60000 | Defines the delay for scheduler that indexes subjects/contributors/classifications/call-numbers in a background |
336337
| SUB_RESOURCE_BATCH_SIZE | 100 | Defines number of sub-resources to process at a time during background indexing |
337338
| STALE_LOCK_THRESHOLD_MS | 600000 | Threshold to consider a sub-resource lock as stale and eligible for release |
339+
| TASK_SCHEDULER_POOL_SIZE | 2 | The number of threads to keep in the task scheduler pool. |
340+
| TASK_SCHEDULER_DELAY_MS | 70000 | The delay in ms for scheduled tasks. |
338341

339342
The module uses system user to communicate with other modules from Kafka consumers.
340343
For production deployments you MUST specify the password for this system user via env variable:

descriptors/ModuleDescriptor-template.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,6 +1251,16 @@
12511251
"value": "500",
12521252
"description": "The capacity of the queue."
12531253
},
1254+
{
1255+
"name": "TASK_SCHEDULER_POOL_SIZE",
1256+
"value": "2",
1257+
"description": "The number of threads to keep in the task scheduler pool."
1258+
},
1259+
{
1260+
"name": "TASK_SCHEDULER_DELAY_MS",
1261+
"value": "70000",
1262+
"description": "The delay in ms for scheduled tasks."
1263+
},
12541264
{
12551265
"name": "SEARCH_QUERY_TIMEOUT",
12561266
"value": "25s",

docker/kafka-init.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ TOPICS=(
2626
"${ENV}.Default.linked-data.hub"
2727
"${ENV}.Default.search.reindex.range-index"
2828
"${ENV}.Default.inventory.reindex-records"
29+
"${ENV}.Default.ALL.CONSORTIUM_INSTANCE_SHARING_COMPLETE"
2930
)
3031

3132
# Updated to use the full path to kafka-topics.sh
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.folio.search.configuration;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import org.folio.search.configuration.properties.TaskSchedulerProperties;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.context.annotation.Configuration;
7+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
8+
9+
@Configuration
10+
@RequiredArgsConstructor
11+
public class TaskSchedulerConfiguration {
12+
13+
@Bean
14+
public ThreadPoolTaskScheduler instanceSharingCompleteTaskScheduler(TaskSchedulerProperties properties) {
15+
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
16+
scheduler.setPoolSize(properties.getPoolSize());
17+
scheduler.setThreadNamePrefix(properties.getThreadNamePrefix());
18+
scheduler.initialize();
19+
return scheduler;
20+
}
21+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.folio.search.configuration.properties;
2+
3+
import lombok.Data;
4+
import org.springframework.boot.context.properties.ConfigurationProperties;
5+
import org.springframework.context.annotation.Configuration;
6+
7+
@Data
8+
@Configuration
9+
@ConfigurationProperties("folio.task-scheduler")
10+
public class TaskSchedulerProperties {
11+
12+
/**
13+
* The delay in milliseconds before the task scheduler starts executing tasks.
14+
*/
15+
private long delayMs = 70000L;
16+
17+
/**
18+
* The size of the thread pool for task scheduling.
19+
*/
20+
private int poolSize = 2;
21+
22+
/**
23+
* The prefix for the names of threads in the task scheduler.
24+
*/
25+
private String threadNamePrefix = "mod-search-task-scheduler-";
26+
}

src/main/java/org/folio/search/integration/message/KafkaMessageListener.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.commons.lang3.Strings;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
2121
import org.apache.logging.log4j.message.FormattedMessage;
22+
import org.folio.search.configuration.properties.TaskSchedulerProperties;
2223
import org.folio.search.domain.dto.ResourceEvent;
2324
import org.folio.search.model.event.IndexInstanceEvent;
2425
import org.folio.search.model.event.InstanceSharingCompleteEvent;
@@ -52,7 +53,8 @@ public class KafkaMessageListener {
5253
private final InstanceEventMapper instanceEventMapper;
5354
private final CallNumberRepository callNumberRepository;
5455
private final ConsortiumTenantProvider consortiumTenantProvider;
55-
private TaskScheduler taskScheduler;
56+
private final TaskSchedulerProperties taskSchedulerProperties;
57+
private final TaskScheduler instanceSharingCompleteTaskScheduler;
5658

5759
/**
5860
* Handles instance events and indexes them by id.
@@ -192,20 +194,20 @@ public void handleInstanceSharingCompleteEvents(List<InstanceSharingCompleteEven
192194
log.info("Processing instance sharing complete events from Kafka [number of events: {}]",
193195
instanceSharingCompleteEvents.size());
194196
// wait until instance children processing is completed and the call number records are available for update
195-
taskScheduler.schedule(() -> {
197+
instanceSharingCompleteTaskScheduler.schedule(() -> {
196198
var batch = instanceSharingCompleteEvents.stream()
197199
.filter(event -> InstanceSharingCompleteEvent.Status.COMPLETE.equals(event.getStatus())
198200
&& StringUtils.isEmpty(event.getError()))
199201
.toList();
202+
200203
var batchByTenant = batch.stream()
201204
.collect(Collectors.groupingBy(
202205
InstanceSharingCompleteEvent::getTargetTenantId,
203206
Collectors.mapping(InstanceSharingCompleteEvent::getInstanceIdentifier, Collectors.toList())
204207
));
205-
log.info("handleInstanceSharingCompleteEvents: Grouped {} instance sharing complete events by tenant",
206-
batch.size());
208+
207209
updateTenantIdForCentralInstances(batch, batchByTenant);
208-
}, Instant.now().plusSeconds(70));
210+
}, Instant.now().plusMillis(taskSchedulerProperties.getDelayMs()));
209211
}
210212

211213
private void updateTenantIdForCentralInstances(List<InstanceSharingCompleteEvent> batch,
@@ -214,11 +216,9 @@ private void updateTenantIdForCentralInstances(List<InstanceSharingCompleteEvent
214216
folioMessageBatchProcessor.consumeBatchWithFallback(batch, KAFKA_RETRY_TEMPLATE_NAME,
215217
event -> {
216218
if (consortiumTenantProvider.isCentralTenant(tenant)) {
217-
log.info("updateTenantIdForCentralInstances: Updating call number tenant_id for {} instances "
218-
+ "in central tenant {}", instanceIdentifiers.size(), tenant);
219+
log.info("updateTenantIdForCentralInstances: Updating call number tenant_id for {} instances in "
220+
+ "central tenant {}", instanceIdentifiers.size(), tenant);
219221
callNumberRepository.updateTenantIdForCentralInstances(instanceIdentifiers, tenant);
220-
} else {
221-
log.info("updateTenantIdForCentralInstances: Skipping tenant {} as it is not a central tenant", tenant);
222222
}
223223
}, KafkaMessageListener::logFailedEvent);
224224
return null;

src/main/resources/application-dev.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ folio:
112112
max-pool-size: ${STREAM_ID_MAX_POOL_SIZE:2}
113113
queue-capacity: ${STREAM_ID_QUEUE_CAPACITY:500}
114114
job-expiration-days: ${STREAM_ID_JOB_EXPIRATION_DAYS:7}
115+
task-scheduler:
116+
pool-size: ${TASK_SCHEDULER_POOL_SIZE:2}
117+
thread-name-prefix: ${folio.environment}-task-scheduler-
118+
delay-ms: ${TASK_SCHEDULER_DELAY_MS:70000}
115119
kafka:
116120
retry-interval-ms: ${KAFKA_RETRY_INTERVAL_MS:2000}
117121
retry-delivery-attempts: ${KAFKA_RETRY_DELIVERY_ATTEMPTS:6}

src/main/resources/application.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ folio:
110110
max-pool-size: ${STREAM_ID_MAX_POOL_SIZE:2}
111111
queue-capacity: ${STREAM_ID_QUEUE_CAPACITY:500}
112112
job-expiration-days: ${STREAM_ID_JOB_EXPIRATION_DAYS:7}
113+
task-scheduler:
114+
pool-size: ${TASK_SCHEDULER_POOL_SIZE:2}
115+
thread-name-prefix: ${folio.environment}-task-scheduler-
116+
delay-ms: ${TASK_SCHEDULER_DELAY_MS:70000}
113117
kafka:
114118
retry-interval-ms: ${KAFKA_RETRY_INTERVAL_MS:2000}
115119
retry-delivery-attempts: ${KAFKA_RETRY_DELIVERY_ATTEMPTS:6}

src/test/java/org/folio/indexing/IndexingInstanceCallNumberConsortiumIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import static org.folio.support.base.ApiEndpoints.instanceSearchPath;
1414
import static org.folio.support.utils.TestUtils.randomId;
1515

16-
import java.time.Duration;
1716
import java.util.List;
1817
import java.util.Map;
1918
import org.folio.search.domain.dto.Holding;
@@ -81,7 +80,7 @@ void shouldUpdateInstanceCallNumber_onInstanceSharing() {
8180
inventoryApi.shareInstance(CENTRAL_TENANT_ID, INSTANCE_ID);
8281

8382
// then - fetch call number documents and check if tenant field changed to central and shared is true
84-
awaitAssertion(() -> assertInstanceCallNumberTenantId(CENTRAL_TENANT_ID, true), Duration.ofSeconds(80));
83+
awaitAssertion(() -> assertInstanceCallNumberTenantId(CENTRAL_TENANT_ID, true));
8584
}
8685

8786
private static void assertInstanceCallNumberTenantId(String expectedTenantId, boolean shared) {

src/test/java/org/folio/search/integration/KafkaMessageListenerIT.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@
3131
import org.apache.kafka.clients.producer.ProducerRecord;
3232
import org.apache.kafka.common.serialization.StringSerializer;
3333
import org.folio.search.configuration.RetryTemplateConfiguration;
34+
import org.folio.search.configuration.TaskSchedulerConfiguration;
3435
import org.folio.search.configuration.kafka.InstanceResourceEventKafkaConfiguration;
36+
import org.folio.search.configuration.kafka.InstanceSharingCompleteEventKafkaConfiguration;
3537
import org.folio.search.configuration.kafka.ResourceEventKafkaConfiguration;
38+
import org.folio.search.configuration.properties.TaskSchedulerProperties;
3639
import org.folio.search.domain.dto.ResourceEvent;
3740
import org.folio.search.integration.KafkaMessageListenerIT.KafkaListenerTestConfiguration;
3841
import org.folio.search.integration.message.FolioMessageBatchProcessor;
@@ -71,6 +74,7 @@
7174
import org.springframework.kafka.test.context.EmbeddedKafka;
7275
import org.springframework.kafka.test.utils.KafkaTestUtils;
7376
import org.springframework.resilience.annotation.EnableResilientMethods;
77+
import org.springframework.scheduling.TaskScheduler;
7478
import org.springframework.test.context.bean.override.mockito.MockitoBean;
7579

7680
@Log4j2
@@ -112,6 +116,8 @@ class KafkaMessageListenerIT {
112116
private CallNumberRepository callNumberRepository;
113117
@MockitoBean
114118
private ConsortiumTenantProvider consortiumTenantProvider;
119+
@MockitoBean
120+
private TaskScheduler instanceSharingCompleteTaskScheduler;
115121
@Captor
116122
private ArgumentCaptor<ProducerRecord<String, IndexInstanceEvent>> producerRecordCaptor;
117123

@@ -207,7 +213,9 @@ private static ResourceEvent instanceEvent() {
207213
@Import({
208214
InstanceResourceEventKafkaConfiguration.class, ResourceEventKafkaConfiguration.class,
209215
KafkaAutoConfiguration.class, FolioMessageBatchProcessor.class,
210-
RetryTemplateConfiguration.class, ResourceEventBatchInterceptor.class
216+
RetryTemplateConfiguration.class, ResourceEventBatchInterceptor.class,
217+
InstanceSharingCompleteEventKafkaConfiguration.class, TaskSchedulerConfiguration.class,
218+
TaskSchedulerProperties.class
211219
})
212220
static class KafkaListenerTestConfiguration {
213221

0 commit comments

Comments
 (0)