Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3a79f49
remove guava dependency (#773)
jngz-es May 8, 2025
04b175e
added Request API, Action API and Test
Jun 9, 2025
e776d9c
changing file names
Jun 9, 2025
0a6f46c
need to resolve permission issues/ changes to gather all scheduled jo…
Jun 9, 2025
1ffaf43
adding to RestGetSchedulingAction
Jun 10, 2025
287b89d
adding additionl tests
Jun 10, 2025
92398d5
Added IT test for ScheduleInfo
Jun 11, 2025
3dd6186
updating progres - verified API calls in terminal
Jun 12, 2025
188e5ef
adding new values to prepare request
Jun 13, 2025
5d3fafb
removing unneeded code
Jun 16, 2025
2260586
adding transport level API
Jun 18, 2025
8242d31
adding job type and refactoring json output
Jun 19, 2025
9a0bd1f
adding node selection, duplicate filtering and total job count
Jun 19, 2025
3b79d11
removing node specific API
Jun 19, 2025
c0d100f
adding integration tests
Jun 20, 2025
66e6275
updating Scheduled JobInfoIT
Jun 23, 2025
4572873
removing comments
Jun 24, 2025
9c214df
Adding comments
Jun 24, 2025
a9e3489
removing unnessessary tests
Jun 24, 2025
a12301b
Add a CHANGELOG and changelog_verifier workflow (#778)
cwperks Jun 12, 2025
363e354
Add 3.1.0 release notes (#779)
prudhvigodithi Jun 13, 2025
3fe788a
Update Maven snapshots publishing endpoint and credential retrieval (…
zelinh Jun 23, 2025
8c33308
Increment version to 3.2.0 (#783)
cwperks Jun 23, 2025
7bf5dbf
removing .idea files
Jun 24, 2025
3816f43
fixing formatting issues
Jun 24, 2025
9dfb943
adding license header
Jun 24, 2025
8fd9c80
change api call, add testing functionality
Jun 25, 2025
e0b6793
stop tracking .idea/*
Jun 25, 2025
7fe4813
changing constant calls
Jun 26, 2025
ba0aabd
adding additional testing fields
Jun 26, 2025
45af721
changing permissions
Jun 26, 2025
66b115f
removing file
Jun 26, 2025
11a297c
adding files -s
Jun 30, 2025
35fa41f
reset
Jun 30, 2025
3c4ba6c
adding to reset
Jun 30, 2025
a5d4da3
deleting comment
Jun 30, 2025
b0db328
Merge branch 'main' into lists-jobs
Jeremydupras Jun 30, 2025
61530d3
adding .idea files
Jul 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .idea/copyright/SPDX_ALv2.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/copyright/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

import org.opensearch.action.support.WriteRequest;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.jobscheduler.spi.schedule.CronSchedule;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
Expand All @@ -25,6 +27,7 @@

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -33,14 +36,15 @@
/**
* A sample rest handler that supports schedule and deschedule job operation
*
* Users need to provide "id", "index", "job_name", and "interval" parameter to schedule
* Users need to provide "id", "index", "job_name", and either "interval" or "cron" parameter to schedule
* a job. e.g.
* {@code
* POST /_plugins/scheduler_sample/watch?id=dashboards-job-id&job_name=watch dashboards index&index=.opensearch_dashboards_1&interval=1
* POST /_plugins/scheduler_sample/watch?id=dashboards-job-id&job_name=watch dashboards index&index=.opensearch_dashboards_1&cron=0 9 * * MON
* }
*
* creates a job with id "dashboards-job-id" and job name "watch dashboards index",
* which logs ".opensearch_dashboards_1" index's shards info every 1 minute
* which logs ".opensearch_dashboards_1" index's shards info every 1 minute or every Monday at 9 AM
*
* Users can remove that job by calling
* {@code DELETE /_plugins/scheduler_sample/watch?id=dashboards-job-id}
Expand Down Expand Up @@ -68,6 +72,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String indexName = request.param("index");
String jobName = request.param("job_name");
String interval = request.param("interval");
String cron = request.param("cron");
String lockDurationSecondsString = request.param("lock_duration_seconds");
Long lockDurationSeconds = lockDurationSecondsString != null ? Long.parseLong(lockDurationSecondsString) : null;
String jitterString = request.param("jitter");
Expand All @@ -76,14 +81,21 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (id == null || indexName == null) {
throw new IllegalArgumentException("Must specify id and index parameter");
}
SampleJobParameter jobParameter = new SampleJobParameter(
id,
jobName,
indexName,
new IntervalSchedule(Instant.now(), Integer.parseInt(interval), ChronoUnit.MINUTES),
lockDurationSeconds,
jitter
);
if (interval == null && cron == null) {
throw new IllegalArgumentException("Must specify either interval or cron parameter");
}
if (interval != null && cron != null) {
throw new IllegalArgumentException("Cannot specify both interval and cron parameters");
}

Schedule schedule;
if (interval != null) {
schedule = new IntervalSchedule(Instant.now(), Integer.parseInt(interval), ChronoUnit.MINUTES);
} else {
schedule = new CronSchedule(cron, ZoneId.systemDefault());
}

SampleJobParameter jobParameter = new SampleJobParameter(id, jobName, indexName, schedule, lockDurationSeconds, jitter);
IndexRequest indexRequest = new IndexRequest().index(SampleExtensionPlugin.JOB_INDEX_NAME)
.id(id)
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.sampleextension;

import org.opensearch.client.Response;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.jobscheduler.spi.schedule.CronSchedule;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.junit.Before;
import org.junit.After;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;

public class GetScheduledJobInfoIT extends SampleExtensionIntegTestCase {

@Before
public void setupJobs() throws IOException, InterruptedException {
SampleJobParameter jobParam1 = new SampleJobParameter(
"test-job-1",
"Test Job 1",
"test-index-1",
new IntervalSchedule(Instant.now(), 5, ChronoUnit.MINUTES),
30L,
0.1
);

SampleJobParameter jobParam2 = new SampleJobParameter(
"test-job-2",
"Test Job 2",
"test-index-2",
new IntervalSchedule(Instant.now(), 10, ChronoUnit.MINUTES),
60L,
0.2
);

SampleJobParameter jobParam3 = new SampleJobParameter(
"test-job-3",
"Test Job 3",
"test-index-3",
new CronSchedule("30 2 * * *", ZoneId.of("America/New_York")),
90L,
0.3
);

SampleJobParameter jobParam4 = new SampleJobParameter(
"test-job-4",
"Test Job 4",
"test-index-4",
new CronSchedule("0 9 * * MON", ZoneId.systemDefault()),
120L,
0.4
);

createWatcherJob("test-job-1", jobParam1);
createWatcherJob("test-job-2", jobParam2);
createWatcherJob("test-job-3", jobParam3);
createWatcherJob("test-job-4", jobParam4);
// Refresh indices to ensure all jobs are available
makeRequest(client(), "POST", "/_refresh", Collections.emptyMap(), null);

}

public void testGetScheduledJobInfoEntireCluster() throws IOException {

Response response = makeRequest(client(), "GET", "/_plugins/_job_scheduler/api/jobs", Collections.emptyMap(), null);

assertEquals(200, response.getStatusLine().getStatusCode());

Map<String, Object> responseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).map();

assertNotNull(responseJson);
assertTrue("Response should contain scheduled job information", responseJson.containsKey("jobs"));
assertEquals("Should have 4 total jobs", 4, responseJson.get("total_jobs"));

// Verify all test jobs are present
@SuppressWarnings("unchecked")
List<Map<String, Object>> jobs = (List<Map<String, Object>>) responseJson.get("jobs");
assertNotNull("Jobs list should not be null", jobs);
assertEquals("Should have 4 jobs in the list", 4, jobs.size());

// Check that all expected job IDs are present and validate job fields
Set<String> expectedJobIds = Set.of("test-job-1", "test-job-2", "test-job-3", "test-job-4");
Set<String> actualJobIds = new HashSet<>();
for (Map<String, Object> job : jobs) {
actualJobIds.add((String) job.get("job_id"));

// Validate required fields are present
assertEquals("job_type should be scheduler_sample_extension", "scheduler_sample_extension", job.get("job_type"));
assertNotNull("job_id should not be null", job.get("job_id"));
assertEquals("index_name should not be .scheduler_sample_extension", ".scheduler_sample_extension", job.get("index_name"));
assertNotNull("name should not be null", job.get("name"));
assertFalse("descheduled should be False", (Boolean) job.get("descheduled"));
assertTrue("enabled should be True", (Boolean) job.get("enabled"));
assertNotNull("enabled_time should not be null", job.get("enabled_time"));
assertNotNull("last_update_time should not be null", job.get("last_update_time"));
assertNotNull("schedule should not be null", job.get("schedule"));
assertTrue(job.get("lock_duration") instanceof Integer);
assertEquals("none", job.get("jitter"));
assertEquals("none", job.get("delay"));

// Validate schedule object
@SuppressWarnings("unchecked")
Map<String, Object> schedule = (Map<String, Object>) job.get("schedule");
assertTrue(
"schedule should be interval or Cron",
((schedule.get("type").equals("interval")) || (schedule.get("type").equals("cron")))
);
}
assertEquals("All expected job IDs should be present", expectedJobIds, actualJobIds);
}

public void testGetScheduledJobInfoByNode() throws IOException {

Response response = makeRequest(client(), "GET", "/_plugins/_job_scheduler/api/jobs?by_node", Collections.emptyMap(), null);

assertEquals(200, response.getStatusLine().getStatusCode());

Map<String, Object> responseJson = JsonXContent.jsonXContent.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
response.getEntity().getContent()
).map();

assertNotNull(responseJson);
assertTrue("Response should contain scheduled job information", responseJson.containsKey("nodes"));
assertEquals("Should have 4 total jobs", 4, responseJson.get("total_jobs"));

// Verify nodes array contains job information
@SuppressWarnings("unchecked")
List<Map<String, Object>> nodes = (List<Map<String, Object>>) responseJson.get("nodes");
assertNotNull("Nodes list should not be null", nodes);
assertFalse("Should have at least one node", nodes.isEmpty());

// Collect all job IDs across all nodes
Set<String> allJobIds = new HashSet<>();
for (Map<String, Object> node : nodes) {

@SuppressWarnings("unchecked")
List<Map<String, Object>> nodeJobs = (List<Map<String, Object>>) ((Map<String, Object>) node.get("scheduled_job_info")).get(
"jobs"
);
if (nodeJobs != null) {
for (Map<String, Object> job : nodeJobs) {
allJobIds.add((String) job.get("job_id"));
assertEquals("job_type should be scheduler_sample_extension", "scheduler_sample_extension", job.get("job_type"));
assertNotNull("job_id should not be null", job.get("job_id"));
assertEquals(
"index_name should not be .scheduler_sample_extension",
".scheduler_sample_extension",
job.get("index_name")
);
assertNotNull("name should not be null", job.get("name"));
assertFalse("descheduled should be False", (Boolean) job.get("descheduled"));
assertTrue("enabled should be True", (Boolean) job.get("enabled"));
assertNotNull("enabled_time should not be null", job.get("enabled_time"));
assertNotNull("last_update_time should not be null", job.get("last_update_time"));
assertNotNull("schedule should not be null", job.get("schedule"));
assertTrue(job.get("lock_duration") instanceof Integer);
assertEquals("none", job.get("jitter"));
assertEquals("none", job.get("delay"));
// Validate schedule object
@SuppressWarnings("unchecked")
Map<String, Object> schedule = (Map<String, Object>) job.get("schedule");
assertTrue(
"schedule should be interval or Cron",
((schedule.get("type").equals("interval")) || (schedule.get("type").equals("cron")))
);
}
}
}

java.util.Set<String> expectedJobIds = java.util.Set.of("test-job-1", "test-job-2", "test-job-3", "test-job-4");
assertEquals("All expected job IDs should be present across nodes", expectedJobIds, allJobIds);

// Validate job fields across all nodes

}

@After
public void cleanupJobs() throws IOException {
deleteWatcherJob("test-job-1");
deleteWatcherJob("test-job-2");
deleteWatcherJob("test-job-3");
deleteWatcherJob("test-job-4");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.jobscheduler.spi.schedule.CronSchedule;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.test.rest.OpenSearchRestTestCase;
Expand All @@ -48,6 +49,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
import java.util.Collections;
Expand Down Expand Up @@ -261,7 +263,11 @@ protected Map<String, String> getJobParameterAsMap(String jobId, SampleJobParame
params.put("id", jobId);
params.put("job_name", jobParameter.getName());
params.put("index", jobParameter.getIndexToWatch());
params.put("interval", String.valueOf(((IntervalSchedule) jobParameter.getSchedule()).getInterval()));
if (jobParameter.getSchedule() instanceof IntervalSchedule) {
params.put("interval", String.valueOf(((IntervalSchedule) jobParameter.getSchedule()).getInterval()));
} else if (jobParameter.getSchedule() instanceof CronSchedule) {
params.put("cron", ((CronSchedule) jobParameter.getSchedule()).getCronExpression());
}
params.put("lock_duration_seconds", String.valueOf(jobParameter.getLockDurationSeconds()));
return params;
}
Expand Down Expand Up @@ -297,13 +303,23 @@ protected SampleJobParameter getJobParameter(RestClient client, String jobId) th
jobParameter.setIndexToWatch(jobSource.get("index_name_to_watch").toString());

Map<String, Object> jobSchedule = (Map<String, Object>) jobSource.get("schedule");
jobParameter.setSchedule(
new IntervalSchedule(
Instant.ofEpochMilli(Long.parseLong(((Map<String, Object>) jobSchedule.get("interval")).get("start_time").toString())),
Integer.parseInt(((Map<String, Object>) jobSchedule.get("interval")).get("period").toString()),
ChronoUnit.MINUTES
)
);
if (jobSchedule.containsKey("cron")) {
jobParameter.setSchedule(
new CronSchedule(
((Map<String, Object>) jobSchedule.get("cron")).get("expression").toString(),
ZoneId.of(((Map<String, Object>) jobSchedule.get("cron")).get("timezone").toString())
)
);
} else {
jobParameter.setSchedule(
new IntervalSchedule(
Instant.ofEpochMilli(Long.parseLong(((Map<String, Object>) jobSchedule.get("interval")).get("start_time").toString())),
Integer.parseInt(((Map<String, Object>) jobSchedule.get("interval")).get("period").toString()),
ChronoUnit.MINUTES
)

);
}
jobParameter.setLockDurationSeconds(Long.parseLong(jobSource.get("lock_duration_seconds").toString()));
return jobParameter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* UnixCron {@link Schedule} implementation. Refer to https://en.wikipedia.org/wiki/Cron for cron syntax.
*/
public class CronSchedule implements Schedule {
static final String CRON_FIELD = "cron";
public static final String CRON_FIELD = "cron";
static final String EXPRESSION_FIELD = "expression";
static final String TIMEZONE_FIELD = "timezone";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class IntervalSchedule implements Schedule {

static final String START_TIME_FIELD = "start_time";
static final String INTERVAL_FIELD = "interval";
public static final String INTERVAL_FIELD = "interval";
static final String PERIOD_FIELD = "period";
static final String UNIT_FIELD = "unit";

Expand Down
Loading
Loading