Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
01b97b8
Set forbiddenApisTest to ignore failures
cliu123 Sep 10, 2025
173a598
Introduce a configurable remote metadata client
cliu123 Sep 10, 2025
436058a
Update tests
cliu123 Sep 11, 2025
5d887bc
resolving dependency conflicts, updating tests and formatting
cliu123 Sep 11, 2025
0ea04ff
Add UTs for the new remote metadata client settings
cliu123 Sep 11, 2025
b5f5f99
Resovle dependency conflicts
cliu123 Sep 12, 2025
6528277
Add more UT
cliu123 Sep 15, 2025
f5f386e
Set up remote metedata client integration tests with sample-extension…
cliu123 Sep 15, 2025
941d716
Separate test cluster for remote metadata tests
cliu123 Sep 15, 2025
1436910
Migrate job details service to use remote metadata sdk
cliu123 Sep 16, 2025
a5429f9
Revert "Migrate job details service to use remote metadata sdk"
cliu123 Sep 16, 2025
7154ca8
Remove SdkClient from JobDetailService
cliu123 Sep 18, 2025
d2bbbca
Remove sdkClient from JobDetailService
cliu123 Sep 18, 2025
3a44b49
Migrating LockServiceImpl to use sdkClient
cliu123 Sep 18, 2025
e24886d
Migrating from the OpenSearch Client to SdkClient causes finding lock…
cliu123 Sep 19, 2025
c76e36f
Fix integ tests of LockService
cliu123 Sep 21, 2025
5d894d4
Replacing the client with pluginClient upon sdkClient initialization
cliu123 Sep 22, 2025
42ca335
getting the slf version and jackson versioin from OpenSearch core ver…
cliu123 Sep 22, 2025
e279b9e
Revert the changes in sample-extension-plugin
cliu123 Sep 22, 2025
eb9c444
Address review comments
cliu123 Sep 23, 2025
df86c27
Fix typo
cliu123 Sep 24, 2025
cbe39f5
Resolve comments
cliu123 Sep 25, 2025
2b935b4
Skip lock index creation when multi-tenancy is enabled
cliu123 Sep 25, 2025
c00e15f
Remove unnecessary comment
cliu123 Sep 25, 2025
0cbf095
Update error messages
cliu123 Sep 25, 2025
ad89150
Make deleteLock respond false when deleteResponse is null
cliu123 Sep 25, 2025
750960a
Resolve comments
cliu123 Sep 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,34 @@ repositories {

dependencies {
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
implementation ("org.opensearch:opensearch-remote-metadata-sdk-ddb-client:${opensearch_build}")
implementation "org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}"
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${versions.mockito}"

opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
}

def commonResolutionStrategy = {
force "org.slf4j:slf4j-api:${versions.slf4j}"
force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
force "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
force "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
force "commons-codec:commons-codec:${versions.commonscodec}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "jakarta.json:jakarta.json-api:2.1.3"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
}

configurations {
runtimeClasspath.resolutionStrategy(commonResolutionStrategy)
testRuntimeClasspath.resolutionStrategy(commonResolutionStrategy)
}

// RPM & Debian build
apply plugin: 'com.netflix.nebula.ospackage'

Expand Down Expand Up @@ -294,9 +317,9 @@ integTest {
}
}
Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin");
integTest.getClusters().forEach{c -> {
integTest.getClusters().forEach{ c ->
c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))
}}
}

testClusters.integTest {
testDistribution = 'INTEG_TEST'
Expand Down
4 changes: 2 additions & 2 deletions sample-extension-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ integTest {

Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin");
Zip rootBundle = (Zip) rootProject.getTasks().getByName("bundlePlugin");
integTest.getClusters().forEach{c -> {
integTest.getClusters().forEach{c ->
c.plugin(rootProject.getObjects().fileProperty().value(rootBundle.getArchiveFile()))
c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))
}}
}

testClusters.integTest {
testDistribution = 'INTEG_TEST'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.opensearch.plugins.IdentityAwarePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.client.impl.SdkClientFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
Expand All @@ -70,14 +72,23 @@
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.function.Supplier;

import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_ID_FIELD_KEY;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, IdentityAwarePlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";
public static final String TENANT_ID_FIELD = "tenant_id";

private static final Logger log = LogManager.getLogger(JobSchedulerPlugin.class);
private JobSweeper sweeper;
Expand All @@ -87,6 +98,7 @@ public class JobSchedulerPlugin extends Plugin implements ActionPlugin, Extensib
private Map<String, ScheduledJobProvider> indexToJobProviders;
private Set<String> indicesToListen;
private PluginClient pluginClient;
private SdkClient sdkClient;

private JobDetailsService jobDetailsService;

Expand Down Expand Up @@ -130,10 +142,36 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Supplier<Boolean> statusHistoryEnabled = () -> JobSchedulerSettings.STATUS_HISTORY.get(environment.settings());
Settings settings = environment.settings();
Boolean isMultiTenancyEnabled = JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED.get(settings);
this.pluginClient = new PluginClient(client);

// Initialize SDK client for remote metadata storage
this.sdkClient = SdkClientFactory.createSdkClient(
pluginClient,
xContentRegistry,
isMultiTenancyEnabled
? Map.ofEntries(
Map.entry(REMOTE_METADATA_TYPE_KEY, JobSchedulerSettings.REMOTE_METADATA_TYPE.get(settings)),
Map.entry(REMOTE_METADATA_ENDPOINT_KEY, JobSchedulerSettings.REMOTE_METADATA_ENDPOINT.get(settings)),
Map.entry(REMOTE_METADATA_REGION_KEY, JobSchedulerSettings.REMOTE_METADATA_REGION.get(settings)),
Map.entry(REMOTE_METADATA_SERVICE_NAME_KEY, JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME.get(settings)),
Map.entry(TENANT_AWARE_KEY, "true"),
Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD)
)
: Collections.emptyMap()
);

Supplier<Boolean> statusHistoryEnabled = () -> JobSchedulerSettings.STATUS_HISTORY.get(environment.settings());
this.historyService = new JobHistoryService(pluginClient, clusterService);
this.lockService = new LockServiceImpl(pluginClient, clusterService, historyService, statusHistoryEnabled);
this.lockService = new LockServiceImpl(
pluginClient,
clusterService,
historyService,
statusHistoryEnabled,
this.sdkClient,
isMultiTenancyEnabled
);
this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders);
this.scheduler = new JobScheduler(threadPool, this.lockService);
this.sweeper = initSweeper(
Expand All @@ -149,7 +187,7 @@ public Collection<Object> createComponents(
clusterService.addListener(this.sweeper);
clusterService.addLifecycleListener(this.sweeper);

return List.of(this.lockService, this.scheduler, this.jobDetailsService, this.pluginClient);
return List.of(this.lockService, this.scheduler, this.jobDetailsService, this.pluginClient, this.sdkClient);
}

@Override
Expand All @@ -168,6 +206,11 @@ public List<Setting<?>> getSettings() {
settingList.add(JobSchedulerSettings.SWEEP_PERIOD);
settingList.add(JobSchedulerSettings.JITTER_LIMIT);
settingList.add(JobSchedulerSettings.STATUS_HISTORY);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_TYPE);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_ENDPOINT);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_REGION);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME);
settingList.add(JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED);
return settingList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;

public class JobSchedulerSettings {
public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.jobscheduler.request_timeout",
Expand Down Expand Up @@ -60,4 +66,40 @@ public class JobSchedulerSettings {
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting sets the remote metadata type */
public static final Setting<String> REMOTE_METADATA_TYPE = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_TYPE_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting sets the remote metadata endpoint */
public static final Setting<String> REMOTE_METADATA_ENDPOINT = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_ENDPOINT_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting sets the remote metadata region */
public static final Setting<String> REMOTE_METADATA_REGION = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_REGION_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting sets the remote metadata service name */
public static final Setting<String> REMOTE_METADATA_SERVICE_NAME = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_SERVICE_NAME_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting enables multi-tenancy for job scheduler */
public static final Setting<Boolean> JOB_SCHEDULER_MULTI_TENANCY_ENABLED = Setting.boolSetting(
"plugins.jobscheduler." + TENANT_AWARE_KEY,
false,
Setting.Property.NodeScope,
Setting.Property.Final
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void processJobDetails(

/**
* Create Job details entry
* @param tempJobDetails new job details object that need to be inserted as document in the index=
* @param tempJobDetails new job details object that need to be inserted as document in the index
* @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was created
* or else null.
*/
Expand Down
Loading
Loading