diff --git a/.codecov.yml b/.codecov.yml index c8698367..5ada55fd 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -4,11 +4,11 @@ codecov: coverage: precision: 2 round: down - range: "75...100" + range: "70...100" status: project: default: - target: 75% # the required coverage value + target: 70% # the required coverage value threshold: 1% # the leniency in hitting the target # Specify coverage report paths for multi-project build diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java index c750019e..040f74e4 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java @@ -72,5 +72,4 @@ public String getJobIndexName() { public String getJobId() { return this.jobId; } - } diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/StatusHistoryModel.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/StatusHistoryModel.java new file mode 100644 index 00000000..c25d3918 --- /dev/null +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/StatusHistoryModel.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.spi; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.index.seqno.SequenceNumbers; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public final class StatusHistoryModel implements ToXContentObject, Writeable { + public static final String JOB_INDEX_NAME = "job_index_name"; + public static final String JOB_ID = "job_id"; + public static final String START_TIME = "start_time"; + public static final String END_TIME = "end_time"; + public static final String COMPLETION_STATUS = "completion_status"; + + private final String jobIndexName; + private final String jobId; + private final Instant startTime; + private final Instant endTime; + private final int status; + private final long seqNo; + private final long primaryTerm; + + public StatusHistoryModel(String jobIndexName, String jobId, Instant startTime, Instant endTime, int status) { + this(jobIndexName, jobId, startTime, endTime, status, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); + } + + public StatusHistoryModel( + String jobIndexName, + String jobId, + Instant startTime, + Instant endTime, + int status, + long seqNo, + long primaryTerm + ) { + this.jobIndexName = jobIndexName; + this.jobId = jobId; + this.startTime = startTime; + this.endTime = endTime; + this.status = status; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + public StatusHistoryModel(StreamInput in) throws IOException { + this(in.readString(), in.readString(), in.readInstant(), in.readOptionalInstant(), in.readInt(), in.readLong(), in.readLong()); + } + + public static StatusHistoryModel parse(final XContentParser parser, long seqNo, long primaryTerm) throws IOException { + String jobIndexName = null; + String jobId = null; + Instant startTime = null; + Instant endTime = null; + Integer status = null; + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case JOB_INDEX_NAME: + jobIndexName = parser.text(); + break; + case JOB_ID: + jobId = parser.text(); + break; + case START_TIME: + startTime = Instant.ofEpochSecond(parser.longValue()); + break; + case END_TIME: + if (parser.currentToken() != XContentParser.Token.VALUE_NULL) { + endTime = Instant.ofEpochSecond(parser.longValue()); + } + break; + case COMPLETION_STATUS: + status = parser.intValue(); + break; + default: + throw new IllegalArgumentException("Unknown field " + fieldName); + } + } + + return new StatusHistoryModel( + requireNonNull(jobIndexName, "JobIndexName cannot be null"), + requireNonNull(jobId, "JobId cannot be null"), + requireNonNull(startTime, "startTime cannot be null"), + endTime, + requireNonNull(status, "status cannot be null"), + seqNo, + primaryTerm + ); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject() + .field(JOB_INDEX_NAME, this.jobIndexName) + .field(JOB_ID, this.jobId) + .field(START_TIME, this.startTime.getEpochSecond()) + .field(COMPLETION_STATUS, this.status); + + if (this.endTime != null) { + builder.field(END_TIME, this.endTime.getEpochSecond()); + } else { + builder.nullField(END_TIME); + } + + return builder.endObject(); + } + + public String getJobIndexName() { + return jobIndexName; + } + + public String getJobId() { + return jobId; + } + + public Instant getStartTime() { + return startTime; + } + + public Instant getEndTime() { + return endTime; + } + + public int getStatus() { + return status; + } + + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + StatusHistoryModel that = (StatusHistoryModel) o; + return seqNo == that.seqNo + && primaryTerm == that.primaryTerm + && jobIndexName.equals(that.jobIndexName) + && jobId.equals(that.jobId) + && startTime.equals(that.startTime) + && Objects.equals(endTime, that.endTime) + && status == that.status; + } + + @Override + public int hashCode() { + return Objects.hash(jobIndexName, jobId, startTime, endTime, status, seqNo, primaryTerm); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.jobIndexName); + out.writeString(this.jobId); + out.writeInstant(this.startTime); + out.writeOptionalInstant(this.endTime); + out.writeInt(this.status); + out.writeLong(this.seqNo); + out.writeLong(this.primaryTerm); + } +} diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/JobHistoryService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/JobHistoryService.java new file mode 100644 index 00000000..2f889cce --- /dev/null +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/JobHistoryService.java @@ -0,0 +1,261 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.spi.utils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.get.GetRequest; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.MediaType; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.jobscheduler.spi.StatusHistoryModel; +import org.opensearch.transport.client.Client; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Map; + +public class JobHistoryService { + private static final Logger logger = LogManager.getLogger(JobHistoryService.class); + public static final String JOB_HISTORY_INDEX_NAME = ".job-scheduler-history"; + + private final Client client; + private final ClusterService clusterService; + final static Map INDEX_SETTINGS = Map.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-1"); + + public JobHistoryService(final Client client, final ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + private String historyMapping() { + try { + InputStream in = JobHistoryService.class.getResourceAsStream("job_scheduler_history.json"); + StringBuilder stringBuilder = new StringBuilder(); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + for (String line; (line = bufferedReader.readLine()) != null;) { + stringBuilder.append(line); + } + return stringBuilder.toString(); + } catch (IOException e) { + throw new IllegalArgumentException("History Mapping cannot be read correctly."); + } + } + + public boolean historyIndexExist() { + return clusterService.state().routingTable().hasIndex(JOB_HISTORY_INDEX_NAME); + } + + void createHistoryIndex(ActionListener listener) { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + if (historyIndexExist()) { + listener.onResponse(true); + } else { + final CreateIndexRequest request = new CreateIndexRequest(JOB_HISTORY_INDEX_NAME).mapping( + historyMapping(), + (MediaType) XContentType.JSON + ).settings(INDEX_SETTINGS); + client.admin() + .indices() + .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { + if (exception instanceof ResourceAlreadyExistsException + || exception.getCause() instanceof ResourceAlreadyExistsException) { + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + } catch (Exception e) { + logger.error(e); + listener.onFailure(e); + } + } + + /** + * Records job execution history to the history index. + * + * @param startTime the time when job execution started + * @param endTime the time when job execution ended (can be null for ongoing jobs) + * @param status the execution status (e.g., "RUNNING", "SUCCESS", "FAILED") + * @param listener callback for handling the response + */ + public void recordJobHistory( + final String jobIndexName, + final String jobId, + final Instant startTime, + final Instant endTime, + final Integer status, + ActionListener listener + ) { + if (jobIndexName == null || jobId == null || startTime == null) { + listener.onFailure(new IllegalArgumentException("JobIndexName, JobId, StartTime, and Status cannot be null")); + return; + } + + createHistoryIndex(ActionListener.wrap(created -> { + if (created) { + findHistoryRecord(jobIndexName, jobId, startTime, ActionListener.wrap(existingRecord -> { + if (existingRecord != null) { + // Update existing record + StatusHistoryModel updatedModel = new StatusHistoryModel( + jobIndexName, + jobId, + startTime, + endTime, + status, + existingRecord.getSeqNo(), + existingRecord.getPrimaryTerm() + ); + updateHistoryRecord( + updatedModel, + ActionListener.wrap(updated -> listener.onResponse(updated != null), listener::onFailure) + ); + } else { + // Create new record + try { + StatusHistoryModel historyModel = new StatusHistoryModel(jobIndexName, jobId, startTime, endTime, status); + createHistoryRecord(historyModel, listener); + } catch (Exception e) { + logger.error("Failed to create history record", e); + listener.onFailure(e); + } + } + }, listener::onFailure)); + } else { + listener.onResponse(false); + } + }, listener::onFailure)); + } + + private void createHistoryRecord(final StatusHistoryModel historyModel, ActionListener listener) { + try { + String historyId = generateHistoryId(historyModel.getJobIndexName(), historyModel.getJobId(), historyModel.getStartTime()); + + final IndexRequest request = new IndexRequest(JOB_HISTORY_INDEX_NAME).id(historyId) + .source(historyModel.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) + .create(true); + + client.index(request, ActionListener.wrap(response -> { + logger.debug("Successfully recorded job history: {}", historyModel); + listener.onResponse(true); + }, exception -> { + if (exception instanceof VersionConflictEngineException) { + logger.debug("History record already exists: {}", exception.getMessage()); + } + listener.onFailure(exception); + })); + } catch (IOException e) { + logger.error("IOException occurred creating history record", e); + listener.onFailure(e); + } + } + + public void updateHistoryRecord(final StatusHistoryModel historyModelupdate, ActionListener listener) { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + String documentId = generateHistoryId( + historyModelupdate.getJobIndexName(), + historyModelupdate.getJobId(), + historyModelupdate.getStartTime() + ); + + UpdateRequest updateRequest = new UpdateRequest().index(JOB_HISTORY_INDEX_NAME) + .id(documentId) + .setIfSeqNo(historyModelupdate.getSeqNo()) + .setIfPrimaryTerm(historyModelupdate.getPrimaryTerm()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .doc(historyModelupdate.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .fetchSource(true); + + client.update( + updateRequest, + ActionListener.wrap( + response -> listener.onResponse( + new StatusHistoryModel( + historyModelupdate.getJobIndexName(), + historyModelupdate.getJobId(), + historyModelupdate.getStartTime(), + historyModelupdate.getEndTime(), + historyModelupdate.getStatus(), + response.getSeqNo(), + response.getPrimaryTerm() + ) + ), + exception -> { + if (exception instanceof VersionConflictEngineException) { + logger.debug("Version conflict updating history record: {}", exception.getMessage()); + } + listener.onResponse(null); + } + ) + ); + } catch (IOException e) { + listener.onFailure(e); + } + } + + public void findHistoryRecord( + final String jobIndexName, + final String jobId, + final Instant startTime, + ActionListener listener + ) { + try (ThreadContext.StoredContext ignore = client.threadPool().getThreadContext().stashContext()) { + String historyId = generateHistoryId(jobIndexName, jobId, startTime); + GetRequest getRequest = new GetRequest(JOB_HISTORY_INDEX_NAME).id(historyId); + + client.get(getRequest, ActionListener.wrap(response -> { + if (!response.isExists()) { + listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); + parser.nextToken(); + listener.onResponse(StatusHistoryModel.parse(parser, response.getSeqNo(), response.getPrimaryTerm())); + } catch (IOException e) { + logger.error("IOException occurred parsing history record", e); + listener.onResponse(null); + } + } + }, exception -> { + if (exception.getMessage() != null && exception.getMessage().contains("no such index")) { + listener.onResponse(null); + } else { + listener.onFailure(exception); + } + })); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private String generateHistoryId(String jobIndexName, String jobId, Instant startTime) { + return jobIndexName + "-" + jobId + "-" + startTime.getEpochSecond(); + } +} diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index f732a14a..bf545ed0 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -45,6 +45,7 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Map; +import java.util.function.Supplier; public class LockService { private static final Logger logger = LogManager.getLogger(LockService.class); @@ -53,13 +54,29 @@ public class LockService { private final Client client; private final ClusterService clusterService; final static Map INDEX_SETTINGS = Map.of("index.number_of_shards", 1, "index.auto_expand_replicas", "0-1"); + private final JobHistoryService historyService; + private final Supplier statusHistoryEnabled; // This is used in tests to control time. private Instant testInstant = null; + public LockService( + final Client client, + final ClusterService clusterService, + JobHistoryService historyService, + Supplier statusHistoryEnabled + ) { + this.client = client; + this.clusterService = clusterService; + this.historyService = historyService; + this.statusHistoryEnabled = statusHistoryEnabled; + } + public LockService(final Client client, final ClusterService clusterService) { this.client = client; this.clusterService = clusterService; + this.historyService = null; + this.statusHistoryEnabled = () -> false; } private String lockMapping() { @@ -125,7 +142,23 @@ public void acquireLock( final String jobIndexName = context.getJobIndexName(); final String jobId = context.getJobId(); final long lockDurationSeconds = jobParameter.getLockDurationSeconds(); - acquireLockWithId(jobIndexName, lockDurationSeconds, jobId, listener); + + acquireLockWithId(jobIndexName, lockDurationSeconds, jobId, ActionListener.wrap(lock -> { + + if (lock != null && statusHistoryEnabled.get() && historyService != null) { + historyService.recordJobHistory( + jobIndexName, + jobId, + lock.getLockTime(), + null, + 1, + ActionListener.wrap(success -> listener.onResponse(lock), failure -> listener.onResponse(lock)) + ); + } else { + listener.onResponse(lock); + } + + }, listener::onFailure)); } /** @@ -303,6 +336,17 @@ public void release(final LockModel lock, ActionListener listener) { } else { logger.debug("Releasing lock: " + lock); final LockModel lockToRelease = new LockModel(lock, true); + + if (statusHistoryEnabled.get() && historyService != null) { + historyService.recordJobHistory( + lock.getJobIndexName(), + lock.getJobId(), + lock.getLockTime(), + Instant.now(), + 0, + ActionListener.wrap(success -> {}, listener::onFailure) + ); + } updateLock(lockToRelease, ActionListener.wrap(releasedLock -> listener.onResponse(releasedLock != null), listener::onFailure)); } } diff --git a/spi/src/main/resources/org/opensearch/jobscheduler/spi/utils/job_scheduler_history.json b/spi/src/main/resources/org/opensearch/jobscheduler/spi/utils/job_scheduler_history.json new file mode 100644 index 00000000..3a0a56f6 --- /dev/null +++ b/spi/src/main/resources/org/opensearch/jobscheduler/spi/utils/job_scheduler_history.json @@ -0,0 +1,21 @@ +{ + "properties": { + "job_index_name": { + "type": "keyword" + }, + "job_id": { + "type": "keyword" + }, + "start_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "end_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "status": { + "type": "integer" + } + } +} diff --git a/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/JobHistoryServiceIT.java b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/JobHistoryServiceIT.java new file mode 100644 index 00000000..69d9838a --- /dev/null +++ b/spi/src/test/java/org/opensearch/jobscheduler/spi/utils/JobHistoryServiceIT.java @@ -0,0 +1,237 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.spi.utils; + +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.core.action.ActionListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.jobscheduler.spi.StatusHistoryModel; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class JobHistoryServiceIT extends OpenSearchIntegTestCase { + + private ClusterService clusterService; + static final String JOB_ID = "test_job_id"; + static final String JOB_INDEX_NAME = "test_job_index_name"; + + @Before + public void setup() { + this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(this.clusterService.state().routingTable().hasIndex(JobHistoryService.JOB_HISTORY_INDEX_NAME)) + .thenReturn(false) + .thenReturn(true); + } + + public void testRecordJobHistorySanity() throws Exception { + String uniqSuffix = "_record_sanity"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + String jobIndexName = "test-job-index" + uniqSuffix; + String jobId = "test-job-id" + uniqSuffix; + Instant startTime = Instant.now(); + Integer status = 1; + historyService.recordJobHistory(jobIndexName, jobId, startTime, null, status, ActionListener.wrap(result -> { + assertTrue("Failed to record job history", result); + latch.countDown(); + }, exception -> fail("Exception occurred: " + exception.getMessage()))); + + latch.await(10L, TimeUnit.SECONDS); + } + + public void testUpdateJobHistory() throws Exception { + String uniqSuffix = "_update_history"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + String jobIndexName = "test-job-index" + uniqSuffix; + String jobId = "test-job-id" + uniqSuffix; + Instant startTime = Instant.now(); + Instant endTime = startTime.plusSeconds(60); + // First record + historyService.recordJobHistory(jobIndexName, jobId, startTime, null, 1, ActionListener.wrap(result -> { + assertTrue("Failed to record initial job history", result); + // Update with end time + historyService.recordJobHistory(jobIndexName, jobId, startTime, endTime, 2, ActionListener.wrap(updateResult -> { + assertTrue("Failed to update job history", updateResult); + latch.countDown(); + }, exception -> fail("Exception during update: " + exception.getMessage()))); + }, exception -> fail("Exception during initial record: " + exception.getMessage()))); + + latch.await(15L, TimeUnit.SECONDS); + } + + public void testFindHistoryRecord() throws Exception { + String uniqSuffix = "_find_record"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + String jobIndexName = "test-job-index" + uniqSuffix; + String jobId = "test-job-id" + uniqSuffix; + Instant startTime = Instant.now(); + // Record first + historyService.recordJobHistory(jobIndexName, jobId, startTime, null, 1, ActionListener.wrap(result -> { + assertTrue("Failed to record job history", result); + // Find the record + historyService.findHistoryRecord(jobIndexName, jobId, startTime, ActionListener.wrap(historyModel -> { + assertNotNull("History record should exist", historyModel); + assertEquals("Job index name mismatch", jobIndexName, historyModel.getJobIndexName()); + assertEquals("Job ID mismatch", jobId, historyModel.getJobId()); + assertEquals("Start time mismatch", startTime.getEpochSecond(), historyModel.getStartTime().getEpochSecond()); + assertEquals("Status mismatch", 1, historyModel.getStatus()); + latch.countDown(); + }, exception -> fail("Exception during find: " + exception.getMessage()))); + }, exception -> fail("Exception during record: " + exception.getMessage()))); + + latch.await(15L, TimeUnit.SECONDS); + } + + public void testFindNonExistentRecord() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + historyService.findHistoryRecord("non-existent-index", "non-existent-job", Instant.now(), ActionListener.wrap(historyModel -> { + assertNull("Non-existent record should return null", historyModel); + latch.countDown(); + }, exception -> fail("Exception should not occur for non-existent record: " + exception.getMessage()))); + + latch.await(10L, TimeUnit.SECONDS); + } + + public void testRecordJobHistoryWithNullJobIndexName() throws Exception { + String uniqSuffix = "_nullIndex"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + historyService.recordJobHistory( + null, + "test-job", + Instant.now(), + null, + 1, + ActionListener.wrap(result -> fail("Should have failed with null job index name"), exception -> { + assertTrue("Should be IllegalArgumentException", exception instanceof IllegalArgumentException); + assertTrue("Should mention null parameter", exception.getMessage().contains("cannot be null")); + latch.countDown(); + }) + ); + + latch.await(10L, TimeUnit.SECONDS); + } + + public void testRecordJobHistoryWithNullJobId() throws Exception { + String uniqSuffix = "nullJobId"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + historyService.recordJobHistory( + "test-index", + null, + Instant.now(), + null, + 1, + ActionListener.wrap(result -> fail("Should have failed with null job id"), exception -> { + assertTrue("Should be IllegalArgumentException", exception instanceof IllegalArgumentException); + assertTrue("Should mention null parameter", exception.getMessage().contains("cannot be null")); + latch.countDown(); + }) + ); + + latch.await(10L, TimeUnit.SECONDS); + } + + public void testRecordJobHistoryWithNullStartTime() throws Exception { + String uniqSuffix = "nullJobId"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + historyService.recordJobHistory( + "test-index", + "test-job", + null, + null, + 1, + ActionListener.wrap(result -> fail("Should have failed with null start time"), exception -> { + assertTrue("Should be IllegalArgumentException", exception instanceof IllegalArgumentException); + assertTrue("Should mention null parameter", exception.getMessage().contains("cannot be null")); + latch.countDown(); + }) + ); + + latch.await(10L, TimeUnit.SECONDS); + } + + public void testHistoryIndexCreation() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + historyService.createHistoryIndex(ActionListener.wrap(created -> { + assertTrue("History index should be created", created); + latch.countDown(); + }, exception -> fail("Exception during index creation: " + exception.getMessage()))); + + latch.await(10L, TimeUnit.SECONDS); + } + + public void testRecordJobHistoryWithEndTime() throws Exception { + String uniqSuffix = "_with_end_time"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + String jobIndexName = "test-job-index" + uniqSuffix; + String jobId = "test-job-id" + uniqSuffix; + Instant startTime = Instant.now(); + Instant endTime = startTime.plusSeconds(30); + Integer status = 2; + historyService.recordJobHistory(jobIndexName, jobId, startTime, endTime, status, ActionListener.wrap(result -> { + assertTrue("Failed to record job history with end time", result); + // Verify the record was created with end time + historyService.findHistoryRecord(jobIndexName, jobId, startTime, ActionListener.wrap(historyModel -> { + assertNotNull("History record should exist", historyModel); + assertEquals("End time mismatch", endTime.getEpochSecond(), historyModel.getEndTime().getEpochSecond()); + assertEquals("Status mismatch", status.intValue(), historyModel.getStatus()); + latch.countDown(); + }, exception -> fail("Exception during find: " + exception.getMessage()))); + }, exception -> fail("Exception occurred: " + exception.getMessage()))); + + latch.await(15L, TimeUnit.SECONDS); + } + + public void testUpdateHistoryRecordDirectly() throws Exception { + String uniqSuffix = "_direct_update"; + CountDownLatch latch = new CountDownLatch(1); + JobHistoryService historyService = new JobHistoryService(client(), this.clusterService); + String jobIndexName = "test-job-index" + uniqSuffix; + String jobId = "test-job-id" + uniqSuffix; + Instant startTime = Instant.now(); + // First create a record + historyService.recordJobHistory(jobIndexName, jobId, startTime, null, 1, ActionListener.wrap(result -> { + assertTrue("Failed to record initial job history", result); + // Find the record to get seq_no and primary_term + historyService.findHistoryRecord(jobIndexName, jobId, startTime, ActionListener.wrap(historyModel -> { + assertNotNull("History record should exist", historyModel); + // Create updated model + StatusHistoryModel updatedModel = new StatusHistoryModel( + jobIndexName, + jobId, + startTime, + Instant.now(), + 3, + historyModel.getSeqNo(), + historyModel.getPrimaryTerm() + ); + // Update directly + historyService.updateHistoryRecord(updatedModel, ActionListener.wrap(updatedHistoryModel -> { + assertNotNull("Updated history model should not be null", updatedHistoryModel); + assertEquals("Status should be updated", 3, updatedHistoryModel.getStatus()); + assertNotNull("End time should be set", updatedHistoryModel.getEndTime()); + latch.countDown(); + }, exception -> fail("Exception during update: " + exception.getMessage()))); + }, exception -> fail("Exception during find: " + exception.getMessage()))); + }, exception -> fail("Exception during initial record: " + exception.getMessage()))); + + latch.await(20L, TimeUnit.SECONDS); + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java index f696c08b..3ab507a2 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java @@ -46,6 +46,7 @@ import org.opensearch.index.IndexModule; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.jobscheduler.spi.utils.JobHistoryService; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.Plugin; @@ -78,6 +79,7 @@ public class JobSchedulerPlugin extends Plugin implements ActionPlugin, Extensib private JobSweeper sweeper; private JobScheduler scheduler; private LockService lockService; + private JobHistoryService historyService; private Map indexToJobProviders; private Set indicesToListen; @@ -117,7 +119,9 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { - this.lockService = new LockService(client, clusterService); + Supplier statusHistoryEnabled = () -> JobSchedulerSettings.STATUS_HISTORY.get(environment.settings()); + this.historyService = new JobHistoryService(client, clusterService); + this.lockService = new LockService(client, clusterService, historyService, statusHistoryEnabled); this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders); this.scheduler = new JobScheduler(threadPool, this.lockService); this.sweeper = initSweeper( @@ -151,6 +155,7 @@ public List> getSettings() { settingList.add(JobSchedulerSettings.SWEEP_BACKOFF_RETRY_COUNT); settingList.add(JobSchedulerSettings.SWEEP_PERIOD); settingList.add(JobSchedulerSettings.JITTER_LIMIT); + settingList.add(JobSchedulerSettings.STATUS_HISTORY); return settingList; } diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java index fa9b8d31..0a5ed5dd 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerSettings.java @@ -53,4 +53,11 @@ public class JobSchedulerSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ); + + public static final Setting STATUS_HISTORY = Setting.boolSetting( + "plugins.jobscheduler.history.enabled", + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); } diff --git a/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java b/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java index 232cb542..0ee4b816 100644 --- a/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java +++ b/src/test/java/org/opensearch/jobscheduler/JobSchedulerPluginTests.java @@ -137,7 +137,7 @@ public void testLoadExtensions() { public void testGetSettings_returnsSettingsList() { List> settings = plugin.getSettings(); assertNotNull(settings); - assertEquals(12, settings.size()); + assertEquals(13, settings.size()); assertTrue(settings.contains(LegacyOpenDistroJobSchedulerSettings.SWEEP_PAGE_SIZE)); assertTrue(settings.contains(LegacyOpenDistroJobSchedulerSettings.REQUEST_TIMEOUT)); assertTrue(settings.contains(LegacyOpenDistroJobSchedulerSettings.SWEEP_BACKOFF_MILLIS));