Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ A Spring Boot-based distributed job scheduling and execution system that provide
## Features

- **REST API** to create workflow and one-off jobs
- **PostgreSQL database** persistence with Liquibase schema management
- **PostgreSQL database** persistence with Flyway schema management
- **Automatic job executor** that polls every 5 seconds (configurable) for unassigned jobs
- **Worker thread pool** for concurrent job execution (configurable pool size)
- **Priority-based scheduling** (1-10, where 1 is highest priority)
Expand All @@ -20,7 +20,7 @@ This is a multi-module Gradle project:

- **`task-manager-service`**: Core job scheduling and execution framework
- **`example-application`**: Example Spring Boot application demonstrating usage
- **`jobstore-liquibase`**: Database schema management via Liquibase changesets
- **`jobstore-flyway`**: Database schema management via Flyway

## Prerequisites

Expand Down Expand Up @@ -68,35 +68,35 @@ spring.datasource.username=postgres
spring.datasource.password=postgres
```

### Running Liquibase Migrations Manually
### Running Flyway Migrations Manually

**Note**: Liquibase migrations run automatically when the Spring Boot application starts. However, if you need to run migrations manually (e.g., for database setup before starting the application), you can use the provided script:
**Note**: Flyway migrations run automatically when the Spring Boot application starts. However, if you need to run migrations manually (e.g., for database setup before starting the application), you can use the provided script:

1. **Using the provided script** (requires Liquibase CLI):
1. **Using the provided script** (requires Flyway CLI):
```bash
# Make the script executable (if not already)
chmod +x run-liquibase.sh
chmod +x run-flyway.sh

# Run the script
./run-liquibase.sh
./run-flyway.sh
```

The script will:
- Connect to the PostgreSQL database using credentials from `application.properties`
- Run all pending Liquibase migrations
- Run all pending Flyway migrations
- Display the migration status

2. **Installing Liquibase CLI** (if not already installed):
- **macOS**: `brew install liquibase`
- **Linux/Windows**: Download from [https://www.liquibase.org/download](https://www.liquibase.org/download)
2. **Installing Flyway CLI** (if not already installed):
- **macOS**: `brew install flyway`
- **Linux/Windows**: curl -L https://repo1.maven.org/maven2/org/flywaydb/flyway-commandline/11.14.1/flyway-commandline-11.14.1-linux-x64.tar.gz | tar xz

3. **Alternative**: If Liquibase CLI is not available, migrations will run automatically when you start the Spring Boot application. Simply start the application and Liquibase will apply any pending migrations.
3. **Alternative**: If Flyway CLI is not available, migrations will run automatically when you start the Spring Boot application. Simply start the application and Flyway will apply any pending migrations.

**Database Connection Details** (used by `run-liquibase.sh`):
**Database Connection Details** (used by `run-flyway.sh`):
- URL: `jdbc:postgresql://localhost:5435/job_scheduler_db`
- Username: `postgres`
- Password: `postgres`
- Changelog: `classpath:liquibase/jobstore-db-changelog.xml`
- Changelog: `classpath:db/migration`

## Running the Application

Expand Down Expand Up @@ -249,17 +249,17 @@ spring.datasource.url=jdbc:postgresql://localhost:5435/job_scheduler_db
spring.datasource.username=postgres
spring.datasource.password=postgres

# Liquibase Configuration
spring.liquibase.enabled=true
spring.liquibase.change-log=classpath:liquibase/jobstore-db-changelog.xml
# Flyway Configuration
spring.flyway.enabled=true
spring.flyway.locations=classpath:db/migration

# JPA/Hibernate Configuration
spring.jpa.hibernate.ddl-auto=validate
```

## Database Schema

The `jobs` table is automatically created via Liquibase with the following structure:
The `jobs` table is automatically created via Flyway with the following structure:

- `job_id` (UUID, Primary Key) - Auto-generated UUID
- `worker_id` (UUID) - UUID of the worker assigned to this job (NULL if unassigned)
Expand All @@ -272,8 +272,8 @@ The `jobs` table is automatically created via Liquibase with the following struc

### Schema Management

Database schema is managed via Liquibase changesets in the `jobstore-liquibase` module:
- `001-initial-schema.xml`: Initial schema creation with all tables and columns
Database schema is managed via Flyway migrations in the `jobstore-flyway` module:
- `V1__create_jobs_table.sql`: Initial schema creation with all tables and columns

The schema includes:
- `jobs` table with all required columns (`assigned_task_name`, `assigned_task_start_time`, etc.)
Expand All @@ -282,9 +282,9 @@ The schema includes:
- Default values for `priority` (10) and `retry_attempts_remaining` (0)

**Note**:
- Liquibase migrations run automatically when the Spring Boot application starts. The application will apply any pending migrations on startup.
- To run migrations manually before starting the application, use the `run-liquibase.sh` script (see [Database Setup](#database-setup) section for details).
- For manual execution, you can also use the Liquibase CLI directly or restart the application to trigger migrations.
- Flyway migrations run automatically when the Spring Boot application starts. The application will apply any pending migrations on startup.
- To run migrations manually before starting the application, use the `run-fl;yway.sh` script (see [Database Setup](#database-setup) section for details).
- For manual execution, you can also use the Flyway CLI directly or restart the application to trigger migrations.

## Retry Mechanism

Expand Down
2 changes: 1 addition & 1 deletion TASK_MANAGER_SERVICE_EXPLANATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ The system uses PostgreSQL with:
- `assigned_task_name`: Name of the task to execute (TEXT)
- `assigned_task_start_time`: Scheduled start time (TIMESTAMP WITH TIME ZONE)
- **Indexes**: On `worker_id`, `assigned_task_start_time`, `priority`
- **Liquibase**: Schema management via changesets
- **Flyway**: Schema management via migration scripts

---

Expand Down
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'uk.gov.hmcts.cp'
version = System.getProperty('API_SPEC_VERSION') ?: '1.0.0'
version = System.getProperty('API_SPEC_VERSION') ?: '1.0.4'
description = 'Spring Job Scheduler'

def githubActor = project.findProperty("github.actor") ?: System.getenv("GITHUB_ACTOR")
Expand Down Expand Up @@ -55,6 +55,8 @@ subprojects {
// Lombok - using latest version for Java compatibility
compileOnly 'org.projectlombok:lombok:1.18.42'
annotationProcessor 'org.projectlombok:lombok:1.18.42'
testCompileOnly "org.projectlombok:lombok:1.18.42"
testAnnotationProcessor "org.projectlombok:lombok:1.18.42"

// Test
testImplementation 'org.springframework.boot:spring-boot-starter-test'
Expand Down
4 changes: 1 addition & 3 deletions example-application/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies {
// Depend on task-manager-service
implementation project(':task-manager-service')

// Depend on jobstore-liquibase for database schema
// Depend on jobstore-flyway for database schema
implementation project(':jobstore-flyway')

// Spring Boot Web
Expand All @@ -17,8 +17,6 @@ dependencies {
runtimeOnly "org.flywaydb:flyway-database-postgresql"

// Flyway for Database Migration
implementation 'org.springframework.boot:spring-boot-starter-flyway'
implementation 'org.liquibase:liquibase-core'
implementation "org.flywaydb:flyway-core"

// Spring Boot Flyway starter for auto-configuration
Expand Down
2 changes: 0 additions & 2 deletions example-application/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ logging.level.uk.gov.hmcts.cp.taskmanager=DEBUG
logging.level.org.springframework.web=DEBUG
logging.level.org.hibernate.SQL=DEBUG
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE
logging.level.liquibase=DEBUG
logging.level.org.springframework.boot.autoconfigure.liquibase=DEBUG
logging.level.org.springframework.boot.autoconfigure=DEBUG
debug=true

2 changes: 1 addition & 1 deletion integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ The module includes test task implementations used for integration testing:

## Test Database

The integration tests use an **H2 in-memory database** configured in `application-test.properties`. The database schema is managed via Liquibase using the same changesets as the main application.
The integration tests use an **H2 in-memory database** configured in `application-test.properties`. The database schema is managed via Flyway using the same changesets as the main application.

## Configuration

Expand Down
1 change: 0 additions & 1 deletion integration-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-jdbc'
implementation 'org.springframework.boot:spring-boot-starter-flyway'
implementation "org.flywaydb:flyway-core"

testImplementation 'org.springframework.boot:spring-boot-starter-test'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
package uk.gov.hmcts.cp.taskmanager.integration;

import static java.time.ZonedDateTime.now;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static uk.gov.hmcts.cp.taskmanager.domain.ExecutionStatus.COMPLETED;

import uk.gov.hmcts.cp.taskmanager.domain.ExecutionInfo;
import uk.gov.hmcts.cp.taskmanager.domain.ExecutionStatus;
import uk.gov.hmcts.cp.taskmanager.integration.config.IntegrationTestConfiguration;
import uk.gov.hmcts.cp.taskmanager.integration.persistence.TaskStatus;
import uk.gov.hmcts.cp.taskmanager.integration.persistence.TaskStatusRepository;
import uk.gov.hmcts.cp.taskmanager.persistence.entity.Job;
import uk.gov.hmcts.cp.taskmanager.persistence.repository.JobsRepository;
import uk.gov.hmcts.cp.taskmanager.service.ExecutionService;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import jakarta.json.Json;
import jakarta.json.JsonObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
Expand All @@ -41,18 +46,12 @@ class ConcurrentExecutionIntegrationTest extends PostgresIntegrationTestBase {
@Autowired
private JobsRepository jobsRepository;

private JsonObject testJobData;
@Autowired
private TaskStatusRepository taskStatusRepository;

@Autowired
JdbcTemplate jdbcTemplate;

@BeforeEach
void setUp() {
testJobData = Json.createObjectBuilder()
.add("test", "data")
.build();
}

@AfterEach
void tearDown() {
jdbcTemplate.execute("delete from JOBS");
Expand All @@ -71,10 +70,16 @@ void testMultipleJobsExecutedConcurrently() {
// Given - Create multiple jobs
final int jobCount = 5;
final ZonedDateTime startTime = now().minusSeconds(1);
final List<UUID> taskIdList = new ArrayList<>();

for (int i = 0; i < jobCount; i++) {
final UUID taskId = randomUUID();
taskIdList.add(taskId);
final ExecutionInfo executionInfo = new ExecutionInfo(
testJobData,
Json.createObjectBuilder()
.add("test", "data")
.add(ID_KEY, taskId.toString())
.build(),
"TEST_COMPLETED_TASK",
startTime,
ExecutionStatus.STARTED,
Expand All @@ -86,16 +91,26 @@ void testMultipleJobsExecutedConcurrently() {
// When - Wait for all jobs to execute
// Then - All jobs should be executed and deleted
await().atMost(java.time.Duration.ofSeconds(15)).untilAsserted(() -> {
List<Job> jobs = jobsRepository.findAll();
final List<Job> jobs = jobsRepository.findAll();
assertThat(jobs).isEmpty(); // All jobs should be completed
});
// And - All task status updated
await().atMost(java.time.Duration.ofSeconds(15)).untilAsserted(() -> {
final List<TaskStatus> tasks = taskStatusRepository.findAllById(taskIdList);
assertThat(tasks.size()).isEqualTo(jobCount);
assertThat(tasks.stream().allMatch(t -> COMPLETED.name().equals(t.getStatus()))).isTrue();
});
}

@Test
void testJobLockingPreventsDuplicateExecution() {
// Given - Create a single job
final UUID taskId = randomUUID();
final ExecutionInfo executionInfo = new ExecutionInfo(
testJobData,
Json.createObjectBuilder()
.add("test", "data")
.add(ID_KEY, taskId.toString())
.build(),
"TEST_COMPLETED_TASK",
now().minusSeconds(1),
ExecutionStatus.STARTED,
Expand All @@ -112,17 +127,29 @@ void testJobLockingPreventsDuplicateExecution() {
assertThat(job.getWorkerId() != null).isTrue();
}
});

await().atMost(java.time.Duration.ofSeconds(5)).untilAsserted(() -> {
final Optional<TaskStatus> task = taskStatusRepository.findById(taskId);
assertThat(task.isEmpty()).isFalse();
assertThat(task.stream().allMatch(t -> COMPLETED.name().equals(t.getStatus()))).isTrue();
});
}

@Test
void testBatchProcessing() {
// Given - Create more jobs than batch size
int jobCount = 15; // More than default batch size of 10
ZonedDateTime startTime = now().minusSeconds(1);
final ZonedDateTime startTime = now().minusSeconds(1);
final List<UUID> taskIdList = new ArrayList<>();

for (int i = 0; i < jobCount; i++) {
final UUID taskId = randomUUID();
taskIdList.add(taskId);
ExecutionInfo executionInfo = new ExecutionInfo(
testJobData,
Json.createObjectBuilder()
.add("test", "data")
.add(ID_KEY, taskId.toString())
.build(),
"TEST_COMPLETED_TASK",
startTime,
ExecutionStatus.STARTED,
Expand All @@ -137,22 +164,34 @@ void testBatchProcessing() {
List<Job> jobs = jobsRepository.findAll();
assertThat(jobs).isEmpty(); // All jobs should be completed
});

await().atMost(java.time.Duration.ofSeconds(30)).untilAsserted(() -> {
final List<TaskStatus> tasks = taskStatusRepository.findAllById(taskIdList);
assertThat(tasks.size()).isEqualTo(jobCount);
assertThat(tasks.stream().allMatch(t -> COMPLETED.name().equals(t.getStatus()))).isTrue();
});
}

@Test
void testConcurrentJobCreation() throws InterruptedException {
// Given - Create jobs concurrently from multiple threads
int threadCount = 5;
int jobsPerThread = 2;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount * jobsPerThread);
final ExecutorService executor = newFixedThreadPool(threadCount);
final CountDownLatch latch = new CountDownLatch(threadCount * jobsPerThread);
final List<UUID> taskIdList = new ArrayList<>();

// When - Create jobs concurrently
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
for (int j = 0; j < jobsPerThread; j++) {
final UUID taskId = randomUUID();
taskIdList.add(taskId);
ExecutionInfo executionInfo = new ExecutionInfo(
testJobData,
Json.createObjectBuilder()
.add("test", "data")
.add(ID_KEY, taskId.toString())
.build(),
"TEST_COMPLETED_TASK",
now().minusSeconds(1),
ExecutionStatus.STARTED,
Expand All @@ -173,6 +212,12 @@ void testConcurrentJobCreation() throws InterruptedException {
List<Job> jobs = jobsRepository.findAll();
assertThat(jobs).isEmpty(); // All jobs should be completed
});

await().atMost(java.time.Duration.ofSeconds(20)).untilAsserted(() -> {
final List<TaskStatus> tasks = taskStatusRepository.findAllById(taskIdList);
assertThat(tasks.size()).isEqualTo(threadCount * jobsPerThread);
assertThat(tasks.stream().allMatch(t -> COMPLETED.name().equals(t.getStatus()))).isTrue();
});
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
*/
@SpringBootApplication(scanBasePackages = "uk.gov.hmcts.cp.taskmanager")
@EnableScheduling
@EnableJpaRepositories(basePackages = "uk.gov.hmcts.cp.taskmanager.persistence.repository")
@EntityScan(basePackages = "uk.gov.hmcts.cp.taskmanager.persistence.entity")
@EnableJpaRepositories(basePackages = {"uk.gov.hmcts.cp.taskmanager.persistence.repository", "uk.gov.hmcts.cp.taskmanager.integration.persistence"})
@EntityScan(basePackages = {"uk.gov.hmcts.cp.taskmanager.persistence.entity", "uk.gov.hmcts.cp.taskmanager.integration.persistence"})
public class IntegrationTestApplication {
}

Loading
Loading