Skip to content

Commit c6180bc

Browse files
committed
added test scenario - concurrent threads trying to acquire jobs
1 parent b988cd8 commit c6180bc

File tree

5 files changed

+97
-14
lines changed

5 files changed

+97
-14
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
group = 'uk.gov.hmcts.cp'
9-
version = System.getProperty('API_SPEC_VERSION') ?: '1.0.7'
9+
version = System.getProperty('API_SPEC_VERSION') ?: '1.0.0'
1010
description = 'Spring Job Scheduler'
1111

1212
def githubActor = project.findProperty("github.actor") ?: System.getenv("GITHUB_ACTOR")
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package uk.gov.hmcts.cp.taskmanager.integration;
2+
3+
import static java.util.UUID.randomUUID;
4+
import static java.util.concurrent.Executors.newFixedThreadPool;
5+
import static org.junit.jupiter.api.Assertions.assertEquals;
6+
7+
import uk.gov.hmcts.cp.taskmanager.integration.config.IntegrationTestConfiguration;
8+
import uk.gov.hmcts.cp.taskmanager.persistence.entity.Job;
9+
import uk.gov.hmcts.cp.taskmanager.persistence.service.JobService;
10+
11+
import java.time.ZonedDateTime;
12+
import java.util.UUID;
13+
import java.util.concurrent.CountDownLatch;
14+
import java.util.concurrent.CyclicBarrier;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.concurrent.atomic.AtomicInteger;
18+
19+
import jakarta.json.Json;
20+
import org.junit.jupiter.api.Test;
21+
import org.springframework.beans.factory.annotation.Autowired;
22+
import org.springframework.test.annotation.DirtiesContext;
23+
import org.springframework.transaction.support.TransactionTemplate;
24+
25+
@IntegrationTestConfiguration
26+
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
27+
class JobConcurrencyIntegrationTest extends PostgresIntegrationTestBase {
28+
29+
@Autowired
30+
private JobService jobService;
31+
32+
@Autowired
33+
private TransactionTemplate transactionTemplate;
34+
35+
@Test
36+
void testReplicateRaceConditionWithPostgres() throws InterruptedException {
37+
// 1. Setup: Insert a single job into the real Postgres container
38+
final UUID jobId = randomUUID();
39+
transactionTemplate.executeWithoutResult(status -> {
40+
final Job job = new Job();
41+
job.setJobId(jobId);
42+
job.setAssignedTaskName("test-task");
43+
job.setJobData(Json.createObjectBuilder()
44+
.add("testKey", "testValue")
45+
.add("testNumber", 42)
46+
.build());
47+
job.setAssignedTaskStartTime(ZonedDateTime.now().minusMinutes(1));
48+
job.setPriority(1);
49+
job.setRetryAttemptsRemaining(3);
50+
jobService.insertJob(job);
51+
});
52+
53+
final int threadCount = 2;
54+
final ExecutorService executor = newFixedThreadPool(threadCount);
55+
final CyclicBarrier barrier = new CyclicBarrier(threadCount);
56+
final AtomicInteger successfulClaims = new AtomicInteger(0);
57+
final CountDownLatch latch = new CountDownLatch(threadCount);
58+
59+
// 2. Run two threads simultaneously
60+
for (int i = 0; i < threadCount; i++) {
61+
executor.submit(() -> {
62+
try {
63+
barrier.await(); // Sync threads to hit the DB at once
64+
65+
final var jobs = jobService.assignJobsToWorkerBatch(jobId, 10);
66+
if (!jobs.isEmpty()) {
67+
successfulClaims.incrementAndGet();
68+
}
69+
} catch (Exception e) {
70+
System.err.println("Thread failed: " + e.getMessage());
71+
} finally {
72+
latch.countDown();
73+
}
74+
});
75+
}
76+
77+
latch.await(10, TimeUnit.SECONDS);
78+
79+
// 3. Verification
80+
assertEquals(1, successfulClaims.get(),
81+
"RACE CONDITION DETECTED: Multiple threads claimed the same job!");
82+
}
83+
}

task-manager-service/src/main/java/uk/gov/hmcts/cp/taskmanager/domain/executor/JobExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void checkAndAssignJobs() {
156156
try {
157157
logger.debug("Checking for unassigned jobs (batch size: {})...", batchSize);
158158
final UUID workerId = UUID.randomUUID();
159-
final List<Job> assignedJobs = jobService.assignJobsToWorkerBatch(workerId, ZonedDateTime.now(), ZonedDateTime.now(), batchSize);
159+
final List<Job> assignedJobs = jobService.assignJobsToWorkerBatch(workerId, batchSize);
160160

161161
if (assignedJobs.isEmpty()) {
162162
logger.debug("No unassigned jobs found");

task-manager-service/src/main/java/uk/gov/hmcts/cp/taskmanager/persistence/service/JobService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class JobService {
6565
* @param jsonObjectConverter the JSON object converter, must not be null
6666
*/
6767
@Autowired
68-
public JobService(JobsRepository jobsRepository, JsonObjectConverter jsonObjectConverter) {
68+
public JobService(final JobsRepository jobsRepository, final JsonObjectConverter jsonObjectConverter) {
6969
this.jobsRepository = jobsRepository;
7070
this.jsonObjectConverter = jsonObjectConverter;
7171
}
@@ -84,14 +84,14 @@ public JobService(JobsRepository jobsRepository, JsonObjectConverter jsonObjectC
8484
* ordered by priority and start time
8585
*/
8686
@Transactional
87-
public List<Job> getUnassignedJobs(int batchSize) {
87+
public List<Job> getUnassignedJobs(final int batchSize) {
8888
Pageable pageable = PageRequest.of(0, batchSize);
8989
return jobsRepository.findUnassignedJobsWithLimit(ZonedDateTime.now(), pageable);
9090
}
9191

9292
@Transactional
93-
public List<Job> assignJobsToWorkerBatch(UUID workerId,ZonedDateTime lockTime, ZonedDateTime currentTime,int batchSize) {
94-
return jobsRepository.assignJobsToWorkerBatch(workerId, lockTime, currentTime, batchSize);
93+
public List<Job> assignJobsToWorkerBatch(final UUID workerId, final int batchSize) {
94+
return jobsRepository.assignJobsToWorkerBatch(workerId, ZonedDateTime.now(), ZonedDateTime.now(), batchSize);
9595
}
9696

9797
/**
@@ -104,7 +104,7 @@ public List<Job> assignJobsToWorkerBatch(UUID workerId,ZonedDateTime lockTime, Z
104104
* @throws RuntimeException if the job is not found
105105
*/
106106
@Transactional
107-
public void decrementRetryAttempts(UUID jobId) {
107+
public void decrementRetryAttempts(final UUID jobId) {
108108
Job job = jobsRepository.findByJobId(jobId)
109109
.orElseThrow(() -> new RuntimeException("Job not found with id: " + jobId));
110110

@@ -122,7 +122,7 @@ public void decrementRetryAttempts(UUID jobId) {
122122
* @param job the job to insert, must not be null
123123
*/
124124
@Transactional
125-
public void insertJob(Job job) {
125+
public void insertJob(final Job job) {
126126
jobsRepository.save(job);
127127
}
128128

task-manager-service/src/test/java/uk/gov/hmcts/cp/taskmanager/domain/executor/JobExecutorTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void testInit() {
8282

8383
@Test
8484
void testCheckAndAssignJobsWithNoJobs() {
85-
when(jobService.assignJobsToWorkerBatch(any(UUID.class), any(ZonedDateTime.class), any(ZonedDateTime.class), eq(50))).thenReturn(Collections.emptyList());
85+
when(jobService.assignJobsToWorkerBatch(any(UUID.class), eq(50))).thenReturn(Collections.emptyList());
8686

8787
jobExecutor.checkAndAssignJobs();
8888

@@ -103,20 +103,20 @@ void testCheckAndAssignJobsWithMultipleJobs() {
103103
);
104104
List<Job> jobs = Arrays.asList(testJob, job2);
105105

106-
when(jobService.assignJobsToWorkerBatch(any(UUID.class), any(ZonedDateTime.class), any(ZonedDateTime.class), eq(50)))
106+
when(jobService.assignJobsToWorkerBatch(any(UUID.class), eq(50)))
107107
.thenReturn(jobs);
108108

109109
jobExecutor.checkAndAssignJobs();
110110

111-
verify(jobService).assignJobsToWorkerBatch(any(UUID.class), any(ZonedDateTime.class), any(ZonedDateTime.class), eq(50));
111+
verify(jobService).assignJobsToWorkerBatch(any(UUID.class), eq(50));
112112
verify(executor, times(2)).execute(any());
113113
}
114114

115115
@Test
116116
void testCheckAndAssignJobsWithExecutionFailure() {
117117
List<Job> jobs = Collections.singletonList(testJob);
118118

119-
when(jobService.assignJobsToWorkerBatch(any(UUID.class), any(ZonedDateTime.class), any(ZonedDateTime.class), eq(50)))
119+
when(jobService.assignJobsToWorkerBatch(any(UUID.class), eq(50)))
120120
.thenReturn(jobs);
121121
doThrow(new RuntimeException("Executor failed")).when(executor).execute(any(Runnable.class));
122122

@@ -129,7 +129,7 @@ void testCheckAndAssignJobsWithExecutionFailure() {
129129
void testCheckAndAssignJobsWithDecrementFailure() {
130130
List<Job> jobs = Collections.singletonList(testJob);
131131

132-
when(jobService.assignJobsToWorkerBatch(any(UUID.class), any(ZonedDateTime.class), any(ZonedDateTime.class), eq(50)))
132+
when(jobService.assignJobsToWorkerBatch(any(UUID.class), eq(50)))
133133
.thenReturn(jobs);
134134
doThrow(new RuntimeException("Executor failed")).when(executor).execute(any(Runnable.class));
135135
doThrow(new RuntimeException("Decrement failed")).when(jobService).decrementRetryAttempts(any(UUID.class));
@@ -142,7 +142,7 @@ void testCheckAndAssignJobsWithDecrementFailure() {
142142

143143
@Test
144144
void testCheckAndAssignJobsWithServiceException() {
145-
when(jobService.assignJobsToWorkerBatch(any(), any(), any(), eq(50)))
145+
when(jobService.assignJobsToWorkerBatch(any(), eq(50)))
146146
.thenThrow(new RuntimeException("Service error"));
147147

148148
// Should not throw exception, should handle gracefully

0 commit comments

Comments
 (0)