Skip to content

Commit af906fc

Browse files
[2.x] Adding new stop-replication action in ism
Signed-off-by: aggarwalShivani <[email protected]>
1 parent efb8cb6 commit af906fc

File tree

12 files changed

+505
-3
lines changed

12 files changed

+505
-3
lines changed

build.gradle

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ buildscript {
3535
job_scheduler_no_snapshot = opensearch_build
3636
notifications_no_snapshot = opensearch_build
3737
security_no_snapshot = opensearch_build
38+
ccr_no_snapshot = opensearch_build
3839
if (buildVersionQualifier) {
3940
opensearch_build += "-${buildVersionQualifier}"
4041
job_scheduler_no_snapshot += "-${buildVersionQualifier}"
@@ -66,6 +67,10 @@ buildscript {
6667
kotlin_version = System.getProperty("kotlin.version", "1.8.21")
6768

6869
security_plugin_version = System.getProperty("security.version", opensearch_build)
70+
ccr_version = System.getProperty("ccr.version", opensearch_build)
71+
ccr_build_download = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot +
72+
'/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-cross-cluster-replication-' + ccr_no_snapshot + '.zip'
73+
ccr_resource_folder = "src/test/resources/replication"
6974
}
7075

7176
repositories {
@@ -230,6 +235,7 @@ dependencies {
230235
opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip"
231236
opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip"
232237
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
238+
opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip"
233239
}
234240

235241
repositories {
@@ -313,6 +319,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler")
313319
def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core")
314320
def notificationsFile = resolvePluginFile("notifications")
315321
def securityPluginFile = resolvePluginFile("opensearch-security")
322+
def ccrFile = resolvePluginFile("opensearch-security")
316323

317324
ext.getPluginResource = { download_to_folder, download_from_src ->
318325
def src_split = download_from_src.split("/")
@@ -393,6 +400,7 @@ testClusters.integTest {
393400
if (securityEnabled) {
394401
plugin(provider(securityPluginFile))
395402
}
403+
plugin(provider(ccrFile))
396404
setting 'path.repo', repo.absolutePath
397405
}
398406

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
2323
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
2424
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
2525
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
26+
import org.opensearch.indexmanagement.indexstatemanagement.action.StopReplicationActionParser
2627
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
2728
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
2829
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
@@ -51,6 +52,7 @@ class ISMActionsParser private constructor() {
5152
RolloverActionParser(),
5253
ShrinkActionParser(),
5354
SnapshotActionParser(),
55+
StopReplicationActionParser(),
5456
TransformActionParser(),
5557
)
5658

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
12+
13+
/**
14+
* ISM action to stop replication on indices replicated on a follower cluster.
15+
*/
16+
class StopReplicationAction(
17+
index: Int,
18+
) : Action(name, index) {
19+
companion object {
20+
const val name = "stop_replication"
21+
}
22+
23+
private val attemptStopReplicationStep = AttemptStopReplicationStep()
24+
25+
private val steps = listOf(attemptStopReplicationStep)
26+
27+
override fun getStepToExecute(context: StepContext): Step = attemptStopReplicationStep
28+
29+
override fun getSteps(): List<Step> = steps
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.opensearch.core.common.io.stream.StreamInput
9+
import org.opensearch.core.xcontent.XContentParser
10+
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
13+
14+
class StopReplicationActionParser : ActionParser() {
15+
override fun fromStreamInput(sin: StreamInput): Action {
16+
val index = sin.readInt()
17+
return StopReplicationAction(index)
18+
}
19+
20+
override fun fromXContent(xcp: XContentParser, index: Int): Action {
21+
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
22+
ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)
23+
24+
return StopReplicationAction(index)
25+
}
26+
27+
override fun getActionType(): String = StopReplicationAction.name
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.ExceptionsHelper
10+
import org.opensearch.action.support.master.AcknowledgedResponse
11+
import org.opensearch.commons.replication.ReplicationPluginInterface
12+
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
13+
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
16+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
17+
import org.opensearch.snapshots.SnapshotInProgressException
18+
import org.opensearch.transport.RemoteTransportException
19+
20+
class AttemptStopReplicationStep : Step(name) {
21+
private val logger = LogManager.getLogger(javaClass)
22+
private var stepStatus = StepStatus.STARTING
23+
private var info: Map<String, Any>? = null
24+
private var replicationPluginInterface: ReplicationPluginInterface = ReplicationPluginInterface()
25+
fun setReplicationPluginInterface(replicationPluginInterface: ReplicationPluginInterface) {
26+
this.replicationPluginInterface = replicationPluginInterface
27+
}
28+
29+
override suspend fun execute(): Step {
30+
val context = this.context ?: return this
31+
val indexName = context.metadata.index
32+
try {
33+
val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName)
34+
val response: AcknowledgedResponse = context.client.admin().indices().suspendUntil {
35+
replicationPluginInterface.stopReplication(
36+
context.client,
37+
stopIndexReplicationRequestObj,
38+
it,
39+
)
40+
}
41+
if (response.isAcknowledged) {
42+
stepStatus = StepStatus.COMPLETED
43+
info = mapOf("message" to getSuccessMessage(indexName))
44+
} else {
45+
val message = getFailedMessage(indexName)
46+
logger.warn(message)
47+
stepStatus = StepStatus.FAILED
48+
info = mapOf("message" to message)
49+
}
50+
} catch (e: RemoteTransportException) {
51+
val cause = ExceptionsHelper.unwrapCause(e)
52+
if (cause is SnapshotInProgressException) {
53+
handleSnapshotException(indexName, cause)
54+
} else {
55+
handleException(indexName, cause as Exception)
56+
}
57+
} catch (e: SnapshotInProgressException) {
58+
handleSnapshotException(indexName, e)
59+
} catch (e: Exception) {
60+
handleException(indexName, e)
61+
}
62+
return this
63+
}
64+
65+
private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) {
66+
val message = getSnapshotMessage(indexName)
67+
logger.error(message, e)
68+
stepStatus = StepStatus.FAILED
69+
info = mapOf("message" to message)
70+
}
71+
72+
private fun handleException(indexName: String, e: Exception) {
73+
val message = getFailedMessage(indexName)
74+
logger.error(message, e)
75+
stepStatus = StepStatus.FAILED
76+
val mutableInfo = mutableMapOf("message" to message)
77+
val errorMessage = e.message
78+
if (errorMessage != null) mutableInfo["cause"] = errorMessage
79+
info = mutableInfo.toMap()
80+
}
81+
82+
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy(
83+
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
84+
transitionTo = null,
85+
info = info,
86+
)
87+
88+
override fun isIdempotent() = false
89+
90+
companion object {
91+
const val name = "attempt_stop_replication"
92+
93+
fun getFailedMessage(index: String) = "Failed to stop replication [index=$index]"
94+
95+
fun getSuccessMessage(index: String) = "Successfully stopped replication [index=$index]"
96+
97+
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying stop replication [index=$index]"
98+
}
99+
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class ActionValidation(
3333
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
3434
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
3535
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
36+
"stop_replication" -> ValidateStopReplication(settings, clusterService, jvmService).execute(indexName)
3637
// No validations for these actions at current stage.
3738
// Reason: https://github.com/opensearch-project/index-management/issues/587
3839
"notification" -> ValidateNothing(settings, clusterService, jvmService).execute(indexName)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.validation
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.cluster.metadata.MetadataCreateIndexService
10+
import org.opensearch.cluster.service.ClusterService
11+
import org.opensearch.common.settings.Settings
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
13+
import org.opensearch.indexmanagement.util.OpenForTesting
14+
import org.opensearch.indices.InvalidIndexNameException
15+
import org.opensearch.monitor.jvm.JvmService
16+
17+
@OpenForTesting
18+
class ValidateStopReplication(
19+
settings: Settings,
20+
clusterService: ClusterService,
21+
jvmService: JvmService,
22+
) : Validate(settings, clusterService, jvmService) {
23+
private val logger = LogManager.getLogger(javaClass)
24+
25+
@Suppress("ReturnSuppressCount", "ReturnCount")
26+
override fun execute(indexName: String): Validate {
27+
// if these conditions are false, fail validation and do not execute stop_replication action
28+
if (!indexExists(indexName) || !validIndex(indexName)) {
29+
validationStatus = ValidationStatus.FAILED
30+
return this
31+
}
32+
validationMessage = getValidationPassedMessage(indexName)
33+
return this
34+
}
35+
36+
private fun indexExists(indexName: String): Boolean {
37+
val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName)
38+
if (!isIndexExists) {
39+
val message = getNoIndexMessage(indexName)
40+
logger.warn(message)
41+
validationMessage = message
42+
return false
43+
}
44+
return true
45+
}
46+
47+
private fun validIndex(indexName: String): Boolean {
48+
val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) }
49+
// If the index name is invalid for any reason, this will throw an exception giving the reason why in the message.
50+
// That will be displayed to the user as the cause.
51+
try {
52+
MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator)
53+
} catch (e: Exception) {
54+
val message = getIndexNotValidMessage(indexName)
55+
logger.warn(message)
56+
validationMessage = message
57+
return false
58+
}
59+
return true
60+
}
61+
62+
@Suppress("TooManyFunctions")
63+
companion object {
64+
const val name = "validate_stop_replication"
65+
66+
fun getNoIndexMessage(index: String) = "No such index [index=$index] for stop replication action."
67+
68+
fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort stop replication action on it."
69+
70+
fun getValidationPassedMessage(index: String) = "Stop replication action validation passed for [index=$index]"
71+
}
72+
}

src/main/resources/mappings/opendistro-ism-config.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 21
3+
"schema_version": 22
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -170,6 +170,9 @@
170170
"delete": {
171171
"type": "object"
172172
},
173+
"stop_replication": {
174+
"type": "object"
175+
},
173176
"force_merge": {
174177
"properties": {
175178
"max_num_segments": {

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import javax.management.remote.JMXServiceURL
4343

4444
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
4545

46-
val configSchemaVersion = 21
46+
val configSchemaVersion = 22
4747
val historySchemaVersion = 7
4848

4949
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
9+
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
10+
import org.opensearch.indexmanagement.indexstatemanagement.model.State
11+
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
12+
import org.opensearch.indexmanagement.waitFor
13+
import java.time.Instant
14+
import java.time.temporal.ChronoUnit
15+
import java.util.Locale
16+
17+
class StopReplicationActionIT : IndexStateManagementRestTestCase() {
18+
private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT)
19+
20+
fun `test stop_replication on a non-replicated index`() {
21+
val indexName = "${testIndexName}_index_1"
22+
val policyID = "${testIndexName}_testPolicyName_1"
23+
val actionConfig = StopReplicationAction(0)
24+
val states =
25+
listOf(
26+
State("StopReplicationState", listOf(actionConfig), listOf()),
27+
)
28+
29+
val policy =
30+
Policy(
31+
id = policyID,
32+
description = "$testIndexName description",
33+
schemaVersion = 1L,
34+
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
35+
errorNotification = randomErrorNotification(),
36+
defaultState = states[0].name,
37+
states = states,
38+
)
39+
createPolicy(policy, policyID)
40+
createIndex(indexName, policyID)
41+
val managedIndexConfig = getExistingManagedIndexConfig(indexName)
42+
// Change the start time so the job will trigger in 2 seconds.
43+
updateManagedIndexConfigStartTime(managedIndexConfig)
44+
45+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }
46+
47+
// Need to wait two cycles.
48+
// Change the start time so the job will trigger in 2 seconds.
49+
updateManagedIndexConfigStartTime(managedIndexConfig)
50+
waitFor {
51+
val metadataInfo = getExplainManagedIndexMetaData(indexName).info.toString()
52+
assertTrue(
53+
metadataInfo.contains("cause=No replication in progress for index:" + indexName),
54+
)
55+
}
56+
}
57+
}

0 commit comments

Comments
 (0)