Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.identity.PluginSubject
import org.opensearch.indices.SystemIndexDescriptor
import org.opensearch.notifications.action.CreateNotificationConfigAction
import org.opensearch.notifications.action.DeleteNotificationConfigAction
Expand All @@ -37,6 +38,7 @@ import org.opensearch.notifications.resthandler.NotificationChannelListRestHandl
import org.opensearch.notifications.resthandler.NotificationConfigRestHandler
import org.opensearch.notifications.resthandler.NotificationFeaturesRestHandler
import org.opensearch.notifications.resthandler.SendTestMessageRestHandler
import org.opensearch.notifications.security.PluginClient
import org.opensearch.notifications.security.UserAccessManager
import org.opensearch.notifications.send.SendMessageActionHelper
import org.opensearch.notifications.settings.PluginSettings
Expand All @@ -46,8 +48,8 @@ import org.opensearch.notifications.settings.PluginSettings.REMOTE_METADATA_SERV
import org.opensearch.notifications.settings.PluginSettings.REMOTE_METADATA_STORE_TYPE
import org.opensearch.notifications.spi.NotificationCore
import org.opensearch.notifications.spi.NotificationCoreExtension
import org.opensearch.notifications.util.SecureIndexClient
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.IdentityAwarePlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.SystemIndexPlugin
import org.opensearch.remote.metadata.client.impl.SdkClientFactory
Expand All @@ -69,9 +71,10 @@ import java.util.function.Supplier
* Entry point of the OpenSearch Notifications plugin
* This class initializes the rest handlers.
*/
class NotificationPlugin : ActionPlugin, Plugin(), NotificationCoreExtension, SystemIndexPlugin {
class NotificationPlugin : ActionPlugin, Plugin(), NotificationCoreExtension, SystemIndexPlugin, IdentityAwarePlugin {

lateinit var clusterService: ClusterService // initialized in createComponents()
lateinit var pluginClient: PluginClient

internal companion object {
private val log by logger(NotificationPlugin::class.java)
Expand Down Expand Up @@ -121,8 +124,9 @@ class NotificationPlugin : ActionPlugin, Plugin(), NotificationCoreExtension, Sy
log.debug("$LOG_PREFIX:createComponents")
this.clusterService = clusterService
val settings = environment.settings()
pluginClient = PluginClient(client)
val sdkClient = SdkClientFactory.createSdkClient(
SecureIndexClient(client),
pluginClient,
xContentRegistry,
mapOf(
REMOTE_METADATA_TYPE_KEY to REMOTE_METADATA_STORE_TYPE.get(settings),
Expand All @@ -134,10 +138,16 @@ class NotificationPlugin : ActionPlugin, Plugin(), NotificationCoreExtension, Sy
client.threadPool().executor(ThreadPool.Names.GENERIC)
)
PluginSettings.addSettingsUpdateConsumer(clusterService)
NotificationConfigIndex.initialize(sdkClient, client, clusterService)
NotificationConfigIndex.initialize(sdkClient, pluginClient, clusterService)
ConfigIndexingActions.initialize(NotificationConfigIndex, UserAccessManager)
SendMessageActionHelper.initialize(NotificationConfigIndex, UserAccessManager)
return listOf(sdkClient)
return listOf(sdkClient, pluginClient)
}

override fun assignSubject(pluginSubject: PluginSubject) {
// When security is not installed, the pluginSubject will still be assigned.
requireNotNull(pluginSubject)
pluginClient.setSubject(pluginSubject)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.opensearch.notifications.model.DocMetadata.Companion.METADATA_TAG
import org.opensearch.notifications.model.NotificationConfigDoc
import org.opensearch.notifications.model.NotificationConfigDocInfo
import org.opensearch.notifications.settings.PluginSettings
import org.opensearch.notifications.util.SecureIndexClient
import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntil
import org.opensearch.notifications.util.SuspendUtils.Companion.suspendUntilTimeout
import org.opensearch.remote.metadata.client.BulkDataObjectRequest
Expand Down Expand Up @@ -105,7 +104,7 @@ internal object NotificationConfigIndex : ConfigOperations {
* {@inheritDoc}
*/
fun initialize(sdkClient: SdkClient, client: Client, clusterService: ClusterService) {
NotificationConfigIndex.client = SecureIndexClient(client)
NotificationConfigIndex.client = client
NotificationConfigIndex.clusterService = clusterService
NotificationConfigIndex.sdkClient = sdkClient
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.notifications.security

import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionType
import org.opensearch.common.CheckedRunnable
import org.opensearch.core.action.ActionListener
import org.opensearch.core.action.ActionResponse
import org.opensearch.identity.Subject
import org.opensearch.transport.client.Client
import org.opensearch.transport.client.FilterClient

/**
* A special client for executing transport actions as this plugin's system subject.
*/
class PluginClient : FilterClient {
private var subject: Subject? = null

constructor(delegate: Client) : super(delegate)

constructor(delegate: Client, subject: Subject) : super(delegate) {
this.subject = subject
}

fun setSubject(subject: Subject) {
this.subject = subject
}

override fun <Request : ActionRequest?, Response : ActionResponse?> doExecute(
action: ActionType<Response?>?,
request: Request?,
listener: ActionListener<Response?>?
) {
checkNotNull(subject) { "PluginClient is not initialized." }
threadPool().getThreadContext().newStoredContext(false).use { ctx ->
subject!!.runAs<RuntimeException?>(
CheckedRunnable {
Companion.logger.info(
"Running transport action with subject: {}",
subject!!.getPrincipal().getName()
)
super.doExecute<Request?, Response?>(
action,
request,
ActionListener.runBefore<Response?>(listener, CheckedRunnable<RuntimeException> { ctx.restore() })
)
}
)
}
}

companion object {
private val logger: Logger = LogManager.getLogger(PluginClient::class.java)
}
}
Loading
Loading