Skip to content

Commit 52b9503

Browse files
committed
using postgres native sql FOR UPDATE SKIP LOCKED, to assign workerid to resolve the issue with concurrent instances trying to acquire the jobs
1 parent d61830e commit 52b9503

File tree

6 files changed

+208
-274
lines changed

6 files changed

+208
-274
lines changed

task-manager-service/src/main/java/uk/gov/hmcts/cp/taskmanager/autoconfig/TaskManagerAutoConfiguration.java

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
import jakarta.persistence.EntityManager;
1414
import org.springframework.beans.factory.ObjectProvider;
15+
import org.springframework.beans.factory.annotation.Qualifier;
16+
import org.springframework.beans.factory.annotation.Value;
1517
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
1618
import org.springframework.boot.autoconfigure.AutoConfiguration;
1719
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
@@ -24,13 +26,70 @@
2426
import org.springframework.core.type.AnnotationMetadata;
2527
import org.springframework.data.jpa.repository.JpaRepository;
2628
import org.springframework.data.jpa.repository.support.JpaRepositoryFactory;
29+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2730
import org.springframework.transaction.PlatformTransactionManager;
2831

2932
@AutoConfiguration
3033
@Import(TaskManagerAutoConfiguration.PersistencePackagesRegistrar.class)
3134
@ConditionalOnClass({EntityManager.class, JpaRepository.class, JpaRepositoryFactory.class, PlatformTransactionManager.class})
3235
public class TaskManagerAutoConfiguration {
3336

37+
/**
38+
* Polling interval in milliseconds for checking unassigned jobs.
39+
* Configurable via {@code job.executor.poll-interval} property.
40+
*/
41+
@Value("${job.executor.poll-interval:5000}")
42+
private long pollInterval;
43+
44+
/**
45+
* Core thread pool size.
46+
* Configurable via {@code job.executor.core-pool-size} property.
47+
*/
48+
@Value("${job.executor.core-pool-size:5}")
49+
private int corePoolSize;
50+
51+
/**
52+
* Maximum thread pool size.
53+
* Configurable via {@code job.executor.max-pool-size} property.
54+
*/
55+
@Value("${job.executor.max-pool-size:10}")
56+
private int maxPoolSize;
57+
58+
/**
59+
* Queue capacity for pending job executions.
60+
* Configurable via {@code job.executor.queue-capacity} property.
61+
*/
62+
@Value("${job.executor.queue-capacity:100}")
63+
private int queueCapacity;
64+
65+
/**
66+
* Thread name prefix for worker threads.
67+
* Configurable via {@code job.executor.thread-name-prefix} property.
68+
*/
69+
@Value("${job.executor.thread-name-prefix:job-executor-}")
70+
private String threadNamePrefix;
71+
72+
/**
73+
* Whether to wait for tasks to complete on shutdown.
74+
* Configurable via {@code job.executor.wait-for-tasks-on-shutdown} property.
75+
*/
76+
@Value("${job.executor.wait-for-tasks-on-shutdown:true}")
77+
private boolean waitForTasksOnShutdown;
78+
79+
/**
80+
* Maximum seconds to wait for task completion on shutdown.
81+
* Configurable via {@code job.executor.await-termination-seconds} property.
82+
*/
83+
@Value("${job.executor.await-termination-seconds:60}")
84+
private int awaitTerminationSeconds;
85+
86+
/**
87+
* Batch size for fetching unassigned jobs per polling cycle.
88+
* Configurable via {@code job.executor.batch-size} property.
89+
*/
90+
@Value("${job.executor.batch-size:50}")
91+
private int batchSize;
92+
3493
@Bean(name = "jobsRepository")
3594
@ConditionalOnMissingBean(name = "jobsRepository")
3695
public JobsRepository jobsRepository(final EntityManager entityManager) {
@@ -64,9 +123,11 @@ public TaskRegistry taskRegistry(final ObjectProvider<ExecutableTask> executable
64123
public JobExecutor jobExecutor(
65124
final JobService jobService,
66125
final TaskRegistry taskRegistry,
67-
final PlatformTransactionManager platformTransactionManager
126+
final PlatformTransactionManager platformTransactionManager,
127+
@Qualifier("jobExecutorThreadPool")
128+
final ThreadPoolTaskExecutor jobExecutorThreadPool
68129
) {
69-
return new JobExecutor(jobService, taskRegistry, platformTransactionManager);
130+
return new JobExecutor(jobService, taskRegistry, platformTransactionManager, jobExecutorThreadPool);
70131
}
71132

72133
@Bean
@@ -75,6 +136,19 @@ public ExecutionService executionService(final JobService jobService, final Task
75136
return new ExecutionService(jobService, taskRegistry);
76137
}
77138

139+
@Bean(name = "jobExecutorThreadPool")
140+
public ThreadPoolTaskExecutor jobExecutorThreadPool() {
141+
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
142+
executor.setCorePoolSize(corePoolSize);
143+
executor.setMaxPoolSize(maxPoolSize);
144+
executor.setQueueCapacity(queueCapacity);
145+
executor.setThreadNamePrefix(threadNamePrefix);
146+
executor.setWaitForTasksToCompleteOnShutdown(waitForTasksOnShutdown);
147+
executor.setAwaitTerminationSeconds(awaitTerminationSeconds);
148+
executor.initialize();
149+
return executor;
150+
}
151+
78152
static final class PersistencePackagesRegistrar implements ImportBeanDefinitionRegistrar {
79153
@Override
80154
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {

task-manager-service/src/main/java/uk/gov/hmcts/cp/taskmanager/domain/executor/JobExecutor.java

Lines changed: 22 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package uk.gov.hmcts.cp.taskmanager.domain.executor;
22

3-
import org.springframework.transaction.annotation.Transactional;
43
import uk.gov.hmcts.cp.taskmanager.service.task.TaskRegistry;
54
import uk.gov.hmcts.cp.taskmanager.persistence.entity.Job;
65
import uk.gov.hmcts.cp.taskmanager.persistence.service.JobService;
@@ -19,19 +18,19 @@
1918

2019
/**
2120
* Main scheduler component that polls for unassigned jobs and assigns them to worker threads.
22-
*
21+
*
2322
* <p>The JobExecutor is the core component of the task management system. It periodically
2423
* polls the database for unassigned jobs that are ready to be executed and assigns them
2524
* to worker threads for processing.
26-
*
25+
*
2726
* <p><b>Key responsibilities:</b>
2827
* <ul>
2928
* <li>Periodically polling for unassigned jobs (via {@link Scheduled} annotation)</li>
3029
* <li>Assigning jobs to worker threads</li>
3130
* <li>Managing a thread pool for concurrent job execution</li>
3231
* <li>Handling job assignment failures and retry logic</li>
3332
* </ul>
34-
*
33+
*
3534
* <p><b>Configuration:</b>
3635
* <p>All thread pool and polling parameters are configurable via application properties:
3736
* <ul>
@@ -44,7 +43,7 @@
4443
* <li>{@code job.executor.await-termination-seconds} - Await termination seconds (default: 60)</li>
4544
* <li>{@code job.executor.batch-size} - Batch size for fetching jobs (default: 50)</li>
4645
* </ul>
47-
*
46+
*
4847
* <p><b>Execution flow:</b>
4948
* <ol>
5049
* <li>{@link #checkAndAssignJobs()} is called periodically by Spring's scheduler</li>
@@ -58,10 +57,10 @@
5857
* </li>
5958
* <li>Job execution is handled by {@link TaskExecutor} in a separate thread</li>
6059
* </ol>
61-
*
60+
*
6261
* <p>The thread pool is initialized on startup ({@link PostConstruct}) and shutdown
6362
* gracefully on application shutdown ({@link PreDestroy}).
64-
*
63+
*
6564
* @author Task Manager Service
6665
* @since 1.0.0
6766
* @see TaskExecutor
@@ -79,71 +78,22 @@ public class JobExecutor {
7978
* Service for job persistence operations.
8079
*/
8180
private final JobService jobService;
82-
81+
8382
/**
8483
* Registry for task lookup.
8584
*/
8685
private final TaskRegistry taskRegistry;
87-
86+
8887
/**
8988
* Transaction manager for transactional operations.
9089
*/
9190
private final PlatformTransactionManager transactionManager;
92-
91+
9392
/**
9493
* Thread pool executor for concurrent job execution.
9594
*/
9695
private final ThreadPoolTaskExecutor executor;
9796

98-
/**
99-
* Polling interval in milliseconds for checking unassigned jobs.
100-
* Configurable via {@code job.executor.poll-interval} property.
101-
*/
102-
@Value("${job.executor.poll-interval:5000}")
103-
private long pollInterval;
104-
105-
/**
106-
* Core thread pool size.
107-
* Configurable via {@code job.executor.core-pool-size} property.
108-
*/
109-
@Value("${job.executor.core-pool-size:5}")
110-
private int corePoolSize;
111-
112-
/**
113-
* Maximum thread pool size.
114-
* Configurable via {@code job.executor.max-pool-size} property.
115-
*/
116-
@Value("${job.executor.max-pool-size:10}")
117-
private int maxPoolSize;
118-
119-
/**
120-
* Queue capacity for pending job executions.
121-
* Configurable via {@code job.executor.queue-capacity} property.
122-
*/
123-
@Value("${job.executor.queue-capacity:100}")
124-
private int queueCapacity;
125-
126-
/**
127-
* Thread name prefix for worker threads.
128-
* Configurable via {@code job.executor.thread-name-prefix} property.
129-
*/
130-
@Value("${job.executor.thread-name-prefix:job-executor-}")
131-
private String threadNamePrefix;
132-
133-
/**
134-
* Whether to wait for tasks to complete on shutdown.
135-
* Configurable via {@code job.executor.wait-for-tasks-on-shutdown} property.
136-
*/
137-
@Value("${job.executor.wait-for-tasks-on-shutdown:true}")
138-
private boolean waitForTasksOnShutdown;
139-
140-
/**
141-
* Maximum seconds to wait for task completion on shutdown.
142-
* Configurable via {@code job.executor.await-termination-seconds} property.
143-
*/
144-
@Value("${job.executor.await-termination-seconds:60}")
145-
private int awaitTerminationSeconds;
146-
14797
/**
14898
* Batch size for fetching unassigned jobs per polling cycle.
14999
* Configurable via {@code job.executor.batch-size} property.
@@ -153,38 +103,23 @@ public class JobExecutor {
153103

154104
/**
155105
* Constructs a new JobExecutor with the specified dependencies.
156-
*
106+
*
157107
* @param jobService the job service for persistence operations, must not be null
158108
* @param taskRegistry the task registry for task lookup, must not be null
159109
* @param transactionManager the transaction manager, must not be null
160110
*/
161-
public JobExecutor(JobService jobService, TaskRegistry taskRegistry, PlatformTransactionManager transactionManager) {
111+
public JobExecutor(JobService jobService, TaskRegistry taskRegistry,
112+
PlatformTransactionManager transactionManager,
113+
ThreadPoolTaskExecutor jobExecutorThreadPool) {
162114
this.jobService = jobService;
163115
this.taskRegistry = taskRegistry;
164116
this.transactionManager = transactionManager;
165-
this.executor = new ThreadPoolTaskExecutor();
166-
}
167-
168-
/**
169-
* Initializes the thread pool executor with configured parameters.
170-
*
171-
* <p>This method is called after dependency injection completes. It configures
172-
* the thread pool with all the settings from application properties and initializes it.
173-
*/
174-
@PostConstruct
175-
public void init() {
176-
executor.setCorePoolSize(corePoolSize);
177-
executor.setMaxPoolSize(maxPoolSize);
178-
executor.setQueueCapacity(queueCapacity);
179-
executor.setThreadNamePrefix(threadNamePrefix);
180-
executor.setWaitForTasksToCompleteOnShutdown(waitForTasksOnShutdown);
181-
executor.setAwaitTerminationSeconds(awaitTerminationSeconds);
182-
executor.initialize();
117+
this.executor = jobExecutorThreadPool;
183118
}
184119

185120
/**
186121
* Shuts down the thread pool executor gracefully.
187-
*
122+
*
188123
* <p>This method is called when the application is shutting down. It ensures
189124
* that all running tasks complete before the executor is terminated.
190125
*/
@@ -195,7 +130,7 @@ public void destroy() {
195130

196131
/**
197132
* Periodically checks for unassigned jobs and assigns them to worker threads.
198-
*
133+
*
199134
* <p>This method is scheduled to run at fixed intervals (configurable via
200135
* {@code job.executor.poll-interval}). It:
201136
* <ol>
@@ -209,20 +144,19 @@ public void destroy() {
209144
* </ul>
210145
* </li>
211146
* </ol>
212-
*
147+
*
213148
* <p>Jobs are ordered by priority (ascending, 1 is highest) and then by start time
214149
* (ascending), ensuring high-priority jobs are processed first.
215-
*
150+
*
216151
* <p>If an error occurs during job assignment, the retry attempts are decremented
217152
* so the job can be retried in the next polling cycle.
218153
*/
219154
@Scheduled(fixedDelayString = "${job.executor.poll-interval:5000}")
220-
221155
public void checkAndAssignJobs() {
222156
try {
223157
logger.debug("Checking for unassigned jobs (batch size: {})...", batchSize);
224-
UUID workerId = UUID.randomUUID();
225-
List<Job> assignedJobs = jobService.assignJobsToWorkerBatch(workerId, ZonedDateTime.now(),ZonedDateTime.now(),batchSize);
158+
final UUID workerId = UUID.randomUUID();
159+
final List<Job> assignedJobs = jobService.assignJobsToWorkerBatch(workerId, ZonedDateTime.now(), ZonedDateTime.now(), batchSize);
226160

227161
if (assignedJobs.isEmpty()) {
228162
logger.debug("No unassigned jobs found");
@@ -252,11 +186,11 @@ public void checkAndAssignJobs() {
252186

253187
/**
254188
* Executes a job by creating a TaskExecutor and submitting it to the thread pool.
255-
*
189+
*
256190
* <p>This method creates a new {@link TaskExecutor} instance for the job and submits
257191
* it to the thread pool for execution. The TaskExecutor will handle the actual task
258192
* execution, including transaction management and error handling.
259-
*
193+
*
260194
* @param job the job to execute, must not be null and must have a worker assigned
261195
*/
262196
private void executeJob(Job job) {

0 commit comments

Comments
 (0)