Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 2 additions & 25 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -241,34 +241,11 @@ repositories {

dependencies {
implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow')
implementation ("org.opensearch:opensearch-remote-metadata-sdk-ddb-client:${opensearch_build}")
implementation "org.opensearch:opensearch-remote-metadata-sdk:${opensearch_build}"
testImplementation group: 'org.mockito', name: 'mockito-core', version: "${versions.mockito}"

opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
}

def commonResolutionStrategy = {
force "org.slf4j:slf4j-api:${versions.slf4j}"
force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
force "com.fasterxml.jackson.core:jackson-databind:${versions.jackson}"
force "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}"
force "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:${versions.jackson}"
force "commons-codec:commons-codec:${versions.commonscodec}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "jakarta.json:jakarta.json-api:2.1.3"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}"
}

configurations {
runtimeClasspath.resolutionStrategy(commonResolutionStrategy)
testRuntimeClasspath.resolutionStrategy(commonResolutionStrategy)
}

// RPM & Debian build
apply plugin: 'com.netflix.nebula.ospackage'

Expand Down Expand Up @@ -317,9 +294,9 @@ integTest {
}
}
Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin");
integTest.getClusters().forEach{ c ->
integTest.getClusters().forEach{c -> {
c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))
}
}}

testClusters.integTest {
testDistribution = 'INTEG_TEST'
Expand Down
4 changes: 2 additions & 2 deletions sample-extension-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ integTest {

Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin");
Zip rootBundle = (Zip) rootProject.getTasks().getByName("bundlePlugin");
integTest.getClusters().forEach{c ->
integTest.getClusters().forEach{c -> {
c.plugin(rootProject.getObjects().fileProperty().value(rootBundle.getArchiveFile()))
c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile()))
}
}}

testClusters.integTest {
testDistribution = 'INTEG_TEST'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@
import org.opensearch.plugins.IdentityAwarePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.remote.metadata.client.SdkClient;
import org.opensearch.remote.metadata.client.impl.SdkClientFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.script.ScriptService;
Expand All @@ -72,23 +70,14 @@
import java.util.Map;
import java.util.Set;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.function.Supplier;

import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_ID_FIELD_KEY;

public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, SystemIndexPlugin, IdentityAwarePlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
public static final String JS_BASE_URI = "/_plugins/_job_scheduler";
public static final String TENANT_ID_FIELD = "tenant_id";

private static final Logger log = LogManager.getLogger(JobSchedulerPlugin.class);
private JobSweeper sweeper;
Expand All @@ -98,7 +87,6 @@ public class JobSchedulerPlugin extends Plugin implements ActionPlugin, Extensib
private Map<String, ScheduledJobProvider> indexToJobProviders;
private Set<String> indicesToListen;
private PluginClient pluginClient;
private SdkClient sdkClient;

private JobDetailsService jobDetailsService;

Expand Down Expand Up @@ -142,36 +130,10 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
Boolean isMultiTenancyEnabled = JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED.get(settings);
this.pluginClient = new PluginClient(client);

// Initialize SDK client for remote metadata storage
this.sdkClient = SdkClientFactory.createSdkClient(
pluginClient,
xContentRegistry,
isMultiTenancyEnabled
? Map.ofEntries(
Map.entry(REMOTE_METADATA_TYPE_KEY, JobSchedulerSettings.REMOTE_METADATA_TYPE.get(settings)),
Map.entry(REMOTE_METADATA_ENDPOINT_KEY, JobSchedulerSettings.REMOTE_METADATA_ENDPOINT.get(settings)),
Map.entry(REMOTE_METADATA_REGION_KEY, JobSchedulerSettings.REMOTE_METADATA_REGION.get(settings)),
Map.entry(REMOTE_METADATA_SERVICE_NAME_KEY, JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME.get(settings)),
Map.entry(TENANT_AWARE_KEY, "true"),
Map.entry(TENANT_ID_FIELD_KEY, TENANT_ID_FIELD)
)
: Collections.emptyMap()
);

Supplier<Boolean> statusHistoryEnabled = () -> JobSchedulerSettings.STATUS_HISTORY.get(environment.settings());
this.pluginClient = new PluginClient(client);
this.historyService = new JobHistoryService(pluginClient, clusterService);
this.lockService = new LockServiceImpl(
pluginClient,
clusterService,
historyService,
statusHistoryEnabled,
this.sdkClient,
isMultiTenancyEnabled
);
this.lockService = new LockServiceImpl(pluginClient, clusterService, historyService, statusHistoryEnabled);
this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders);
this.scheduler = new JobScheduler(threadPool, this.lockService);
this.sweeper = initSweeper(
Expand All @@ -187,7 +149,7 @@ public Collection<Object> createComponents(
clusterService.addListener(this.sweeper);
clusterService.addLifecycleListener(this.sweeper);

return List.of(this.lockService, this.scheduler, this.jobDetailsService, this.pluginClient, this.sdkClient);
return List.of(this.lockService, this.scheduler, this.jobDetailsService, this.pluginClient);
}

@Override
Expand All @@ -206,11 +168,6 @@ public List<Setting<?>> getSettings() {
settingList.add(JobSchedulerSettings.SWEEP_PERIOD);
settingList.add(JobSchedulerSettings.JITTER_LIMIT);
settingList.add(JobSchedulerSettings.STATUS_HISTORY);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_TYPE);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_ENDPOINT);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_REGION);
settingList.add(JobSchedulerSettings.REMOTE_METADATA_SERVICE_NAME);
settingList.add(JobSchedulerSettings.JOB_SCHEDULER_MULTI_TENANCY_ENABLED);
return settingList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_ENDPOINT_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_REGION_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_SERVICE_NAME_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.REMOTE_METADATA_TYPE_KEY;
import static org.opensearch.remote.metadata.common.CommonValue.TENANT_AWARE_KEY;

public class JobSchedulerSettings {
public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.jobscheduler.request_timeout",
Expand Down Expand Up @@ -66,40 +60,4 @@ public class JobSchedulerSettings {
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting sets the remote metadata type */
public static final Setting<String> REMOTE_METADATA_TYPE = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_TYPE_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting sets the remote metadata endpoint */
public static final Setting<String> REMOTE_METADATA_ENDPOINT = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_ENDPOINT_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting sets the remote metadata region */
public static final Setting<String> REMOTE_METADATA_REGION = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_REGION_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting sets the remote metadata service name */
public static final Setting<String> REMOTE_METADATA_SERVICE_NAME = Setting.simpleString(
"plugins.jobscheduler." + REMOTE_METADATA_SERVICE_NAME_KEY,
Setting.Property.NodeScope,
Setting.Property.Final
);

/** This setting enables multi-tenancy for job scheduler */
public static final Setting<Boolean> JOB_SCHEDULER_MULTI_TENANCY_ENABLED = Setting.boolSetting(
"plugins.jobscheduler." + TENANT_AWARE_KEY,
false,
Setting.Property.NodeScope,
Setting.Property.Final
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public void processJobDetails(

/**
* Create Job details entry
* @param tempJobDetails new job details object that need to be inserted as document in the index
* @param tempJobDetails new job details object that need to be inserted as document in the index=
* @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was created
* or else null.
*/
Expand Down
Loading
Loading