Skip to content

Commit 737faa5

Browse files
Adding unfollow action in ism to invoke stop replication for ccr (#1198)
* Adding unfollow action in ism to invoke stop replication for ccr Signed-off-by: aggarwalShivani <[email protected]> * Adding unfollow action in ism to invoke stop replication for ccr Signed-off-by: aggarwalShivani <[email protected]> * Renamed unfollow to stop replication action Signed-off-by: aggarwalShivani <[email protected]> * Renamed unfollow to stop_replication action Signed-off-by: aggarwalShivani <[email protected]> * Renamed unfollow to stop_replication action and addressed comments Signed-off-by: aggarwalShivani <[email protected]> --------- Signed-off-by: aggarwalShivani <[email protected]> Signed-off-by: aggarwalShivani <[email protected]>
1 parent 701fe1b commit 737faa5

File tree

13 files changed

+468
-4
lines changed

13 files changed

+468
-4
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ src/test/resources/job-scheduler/
1515
src/test/resources/bwc/
1616
bin/
1717
spi/bin/
18-
src/test/resources/notifications*
18+
src/test/resources/notifications*
19+

build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ buildscript {
6666
kotlin_version = System.getProperty("kotlin.version", "1.9.25")
6767

6868
security_plugin_version = System.getProperty("security.version", opensearch_build)
69+
ccr_version = System.getProperty("ccr.version", opensearch_build)
6970
}
7071

7172
repositories {
@@ -246,6 +247,7 @@ dependencies {
246247
opensearchPlugin "org.opensearch.plugin:opensearch-notifications-core:${notifications_version}@zip"
247248
opensearchPlugin "org.opensearch.plugin:notifications:${notifications_version}@zip"
248249
opensearchPlugin "org.opensearch.plugin:opensearch-security:${security_plugin_version}@zip"
250+
opensearchPlugin "org.opensearch.plugin:opensearch-cross-cluster-replication:${ccr_version}@zip"
249251
}
250252

251253
repositories {
@@ -329,6 +331,7 @@ def jobSchedulerFile = resolvePluginFile("opensearch-job-scheduler")
329331
def notificationsCoreFile = resolvePluginFile("opensearch-notifications-core")
330332
def notificationsFile = resolvePluginFile("notifications")
331333
def securityPluginFile = resolvePluginFile("opensearch-security")
334+
def ccrFile = resolvePluginFile("opensearch-cross-cluster-replication")
332335

333336
ext.getPluginResource = { download_to_folder, download_from_src ->
334337
def src_split = download_from_src.split("/")
@@ -409,6 +412,7 @@ testClusters.integTest {
409412
if (securityEnabled) {
410413
plugin(provider(securityPluginFile))
411414
}
415+
plugin(provider(ccrFile))
412416
setting 'path.repo', repo.absolutePath
413417
}
414418

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ISMActionsParser.kt

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction
2424
import org.opensearch.indexmanagement.indexstatemanagement.action.RollupActionParser
2525
import org.opensearch.indexmanagement.indexstatemanagement.action.ShrinkActionParser
2626
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotActionParser
27+
import org.opensearch.indexmanagement.indexstatemanagement.action.StopReplicationActionParser
2728
import org.opensearch.indexmanagement.indexstatemanagement.action.TransformActionParser
2829
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
2930
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
@@ -53,6 +54,7 @@ class ISMActionsParser private constructor() {
5354
ShrinkActionParser(),
5455
SnapshotActionParser(),
5556
TransformActionParser(),
57+
StopReplicationActionParser(),
5658
ConvertIndexToRemoteActionParser(),
5759
)
5860

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication.AttemptStopReplicationStep
9+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
10+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
12+
13+
/**
14+
* ISM action to stop replication on indices replicated on a follower cluster.
15+
*/
16+
class StopReplicationAction(
17+
index: Int,
18+
) : Action(name, index) {
19+
companion object {
20+
const val name = "stop_replication"
21+
}
22+
23+
private val attemptStopReplicationStep = AttemptStopReplicationStep()
24+
25+
private val steps = listOf(attemptStopReplicationStep)
26+
27+
override fun getStepToExecute(context: StepContext): Step = attemptStopReplicationStep
28+
29+
override fun getSteps(): List<Step> = steps
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.opensearch.core.common.io.stream.StreamInput
9+
import org.opensearch.core.xcontent.XContentParser
10+
import org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Action
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.ActionParser
13+
14+
class StopReplicationActionParser : ActionParser() {
15+
override fun fromStreamInput(sin: StreamInput): Action {
16+
val index = sin.readInt()
17+
return StopReplicationAction(index)
18+
}
19+
20+
override fun fromXContent(xcp: XContentParser, index: Int): Action {
21+
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
22+
ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)
23+
24+
return StopReplicationAction(index)
25+
}
26+
27+
override fun getActionType(): String = StopReplicationAction.name
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.step.stopreplication
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.ExceptionsHelper
10+
import org.opensearch.action.support.clustermanager.AcknowledgedResponse
11+
import org.opensearch.commons.replication.ReplicationPluginInterface
12+
import org.opensearch.commons.replication.action.StopIndexReplicationRequest
13+
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
15+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
16+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
17+
import org.opensearch.snapshots.SnapshotInProgressException
18+
import org.opensearch.transport.RemoteTransportException
19+
20+
class AttemptStopReplicationStep : Step(name) {
21+
private val logger = LogManager.getLogger(javaClass)
22+
private var stepStatus = StepStatus.STARTING
23+
private var info: Map<String, Any>? = null
24+
25+
override suspend fun execute(): Step {
26+
val context = this.context ?: return this
27+
val indexName = context.metadata.index
28+
try {
29+
val stopIndexReplicationRequestObj = StopIndexReplicationRequest(indexName)
30+
val response: AcknowledgedResponse = context.client.suspendUntil {
31+
ReplicationPluginInterface.stopReplication(
32+
context.client,
33+
stopIndexReplicationRequestObj,
34+
it,
35+
)
36+
}
37+
if (response.isAcknowledged) {
38+
stepStatus = StepStatus.COMPLETED
39+
info = mapOf("message" to getSuccessMessage(indexName))
40+
} else {
41+
val message = getFailedMessage(indexName)
42+
logger.warn(message)
43+
stepStatus = StepStatus.FAILED
44+
info = mapOf("message" to message)
45+
}
46+
} catch (e: RemoteTransportException) {
47+
val cause = ExceptionsHelper.unwrapCause(e)
48+
if (cause is SnapshotInProgressException) {
49+
handleSnapshotException(indexName, cause)
50+
} else {
51+
handleException(indexName, cause as Exception)
52+
}
53+
} catch (e: SnapshotInProgressException) {
54+
handleSnapshotException(indexName, e)
55+
} catch (e: Exception) {
56+
handleException(indexName, e)
57+
}
58+
return this
59+
}
60+
61+
private fun handleSnapshotException(indexName: String, e: SnapshotInProgressException) {
62+
val message = getSnapshotMessage(indexName)
63+
logger.error(message, e)
64+
stepStatus = StepStatus.FAILED
65+
info = mapOf("message" to message)
66+
}
67+
68+
private fun handleException(indexName: String, e: Exception) {
69+
val message = getFailedMessage(indexName)
70+
logger.error(message, e)
71+
stepStatus = StepStatus.FAILED
72+
val mutableInfo = mutableMapOf("message" to message)
73+
val errorMessage = e.message
74+
if (errorMessage != null) mutableInfo["cause"] = errorMessage
75+
info = mutableInfo.toMap()
76+
}
77+
78+
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy(
79+
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
80+
transitionTo = null,
81+
info = info,
82+
)
83+
84+
override fun isIdempotent() = false
85+
86+
companion object {
87+
const val name = "attempt_stop_replication"
88+
89+
fun getFailedMessage(index: String) = "Failed to stop replication [index=$index]"
90+
91+
fun getSuccessMessage(index: String) = "Successfully stopped replication [index=$index]"
92+
93+
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying stop replication [index=$index]"
94+
}
95+
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ActionValidation.kt

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class ActionValidation(
3434
"transition" -> ValidateTransition(settings, clusterService, jvmService).execute(indexName)
3535
"close" -> ValidateClose(settings, clusterService, jvmService).execute(indexName)
3636
"index_priority" -> ValidateIndexPriority(settings, clusterService, jvmService).execute(indexName)
37+
"stop_replication" -> ValidateStopReplication(settings, clusterService, jvmService).execute(indexName)
3738
// No validations for these actions at current stage.
3839
// Reason: https://github.com/opensearch-project/index-management/issues/587
3940
"notification" -> ValidateNothing(settings, clusterService, jvmService).execute(indexName)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.validation
7+
8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.cluster.metadata.MetadataCreateIndexService
10+
import org.opensearch.cluster.service.ClusterService
11+
import org.opensearch.common.settings.Settings
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate
13+
import org.opensearch.indexmanagement.util.OpenForTesting
14+
import org.opensearch.indices.InvalidIndexNameException
15+
import org.opensearch.monitor.jvm.JvmService
16+
17+
@OpenForTesting
18+
class ValidateStopReplication(
19+
settings: Settings,
20+
clusterService: ClusterService,
21+
jvmService: JvmService,
22+
) : Validate(settings, clusterService, jvmService) {
23+
private val logger = LogManager.getLogger(javaClass)
24+
25+
@Suppress("ReturnSuppressCount", "ReturnCount")
26+
override fun execute(indexName: String): Validate {
27+
// if these conditions are false, fail validation and do not execute stop_replication action
28+
if (!indexExists(indexName) || !validIndex(indexName)) {
29+
validationStatus = ValidationStatus.FAILED
30+
return this
31+
}
32+
validationMessage = getValidationPassedMessage(indexName)
33+
return this
34+
}
35+
36+
private fun indexExists(indexName: String): Boolean {
37+
val isIndexExists = clusterService.state().metadata.indices.containsKey(indexName)
38+
if (!isIndexExists) {
39+
val message = getNoIndexMessage(indexName)
40+
logger.warn(message)
41+
validationMessage = message
42+
return false
43+
}
44+
return true
45+
}
46+
47+
private fun validIndex(indexName: String): Boolean {
48+
val exceptionGenerator: (String, String) -> RuntimeException = { index_name, reason -> InvalidIndexNameException(index_name, reason) }
49+
// If the index name is invalid for any reason, this will throw an exception giving the reason why in the message.
50+
// That will be displayed to the user as the cause.
51+
try {
52+
MetadataCreateIndexService.validateIndexOrAliasName(indexName, exceptionGenerator)
53+
} catch (e: Exception) {
54+
val message = getIndexNotValidMessage(indexName)
55+
logger.warn(message)
56+
validationMessage = message
57+
return false
58+
}
59+
return true
60+
}
61+
62+
@Suppress("TooManyFunctions")
63+
companion object {
64+
const val name = "validate_stop_replication"
65+
66+
fun getNoIndexMessage(index: String) = "No such index [index=$index] for stop replication action."
67+
68+
fun getIndexNotValidMessage(index: String) = "Index [index=$index] is not valid. Abort stop replication action on it."
69+
70+
fun getValidationPassedMessage(index: String) = "Stop replication action validation passed for [index=$index]"
71+
}
72+
}

src/main/resources/mappings/opendistro-ism-config.json

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 23
3+
"schema_version": 24
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -170,6 +170,9 @@
170170
"delete": {
171171
"type": "object"
172172
},
173+
"stop_replication": {
174+
"type": "object"
175+
},
173176
"force_merge": {
174177
"properties": {
175178
"max_num_segments": {

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory
4141
import javax.management.remote.JMXServiceURL
4242

4343
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
44-
val configSchemaVersion = 23
44+
val configSchemaVersion = 24
4545
val historySchemaVersion = 7
4646

4747
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.indexmanagement.indexstatemanagement.action
7+
8+
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
9+
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
10+
import org.opensearch.indexmanagement.indexstatemanagement.model.State
11+
import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification
12+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
13+
import org.opensearch.indexmanagement.waitFor
14+
import java.time.Instant
15+
import java.time.temporal.ChronoUnit
16+
import java.util.Locale
17+
18+
class StopReplicationActionIT : IndexStateManagementRestTestCase() {
19+
private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT)
20+
21+
fun `test failure in stop_replication on a non-replicated index`() {
22+
val indexName = "${testIndexName}_index_1"
23+
val policyID = "${testIndexName}_testPolicyName_1"
24+
val actionConfig = StopReplicationAction(0)
25+
val states =
26+
listOf(
27+
State("StopReplicationState", listOf(actionConfig), listOf()),
28+
)
29+
30+
val policy =
31+
Policy(
32+
id = policyID,
33+
description = "$testIndexName description",
34+
schemaVersion = 1L,
35+
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
36+
errorNotification = randomErrorNotification(),
37+
defaultState = states[0].name,
38+
states = states,
39+
)
40+
createPolicy(policy, policyID)
41+
createIndex(indexName, policyID)
42+
43+
val managedIndexConfig = getExistingManagedIndexConfig(indexName)
44+
// Change the start time so the job will trigger in 2 seconds.
45+
updateManagedIndexConfigStartTime(managedIndexConfig)
46+
47+
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }
48+
49+
// Need to wait two cycles.
50+
// Change the start time so the job will trigger in 2 seconds.
51+
updateManagedIndexConfigStartTime(managedIndexConfig)
52+
waitFor {
53+
// Expecting the step to fail as there's no replication in progress on this index
54+
assertEquals(Step.StepStatus.FAILED, getExplainManagedIndexMetaData(indexName).stepMetaData?.stepStatus)
55+
assertTrue(
56+
getExplainManagedIndexMetaData(indexName).info.toString().contains("cause=No replication in progress for index:" + indexName),
57+
)
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)