Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
CONDITION_NOT_MET("condition_not_met"),
FAILED("failed"),
COMPLETED("completed"),
TIMED_OUT("timed_out"),
;

override fun toString(): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.LockModel
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
Expand Down Expand Up @@ -339,13 +340,18 @@ object ManagedIndexRunner :
if (action?.hasTimedOut(currentActionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type} has timed out")
val updated = updateManagedIndexMetaData(
managedIndexMetaData
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info),

val updatedIndexMetaData = managedIndexMetaData.copy(
actionMetaData = currentActionMetaData?.copy(failed = true),
stepMetaData = step?.let { StepMetaData(it.name, System.currentTimeMillis(), Step.StepStatus.TIMED_OUT) },
info = info,
)

val updated = updateManagedIndexMetaData(updatedIndexMetaData)

if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
publishErrorNotification(policy, updatedIndexMetaData)
}
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.indexmanagement.waitFor
import java.time.Instant
import java.util.Locale
Expand All @@ -20,11 +22,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
fun `test failed action`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val testPolicy = """
val testPolicy =
"""
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()
""".trimIndent()

createPolicyJson(testPolicy, policyID)

Expand Down Expand Up @@ -52,16 +55,32 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
waitFor {
assertPredicatesOnMetaData(
listOf(
indexName to listOf(
ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean =
assertActionEquals(
ActionMetaData(
name = RolloverAction.name, startTime = Instant.now().toEpochMilli(), index = 0,
failed = true, consumedRetries = 0, lastRetryTime = null, actionProperties = null,
),
actionMetaDataMap,
),
),
indexName to
listOf(
ActionMetaData.ACTION to

fun(actionMetaDataMap: Any?): Boolean =
assertActionEquals(
ActionMetaData(
name = RolloverAction.name,
startTime = Instant.now().toEpochMilli(),
index = 0,
failed = true,
consumedRetries = 0,
lastRetryTime = null,
actionProperties = null,
),
actionMetaDataMap,
),
StepMetaData.STEP to
fun(stepMetaDataMap: Any?): Boolean =
assertStepEquals(
StepMetaData(
"attempt_rollover", Instant.now().toEpochMilli(), Step.StepStatus.TIMED_OUT,
),
stepMetaDataMap,
),
),
),
getExplainMap(indexName),
strict = false,
Expand All @@ -73,11 +92,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
fun `test action timeout doesn't bleed over into next action`() {
val indexName = "${testIndexName}_index_2"
val policyID = "${testIndexName}_testPolicyName_2"
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()
val testPolicy =
"""
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()

createPolicyJson(testPolicy, policyID)

Expand All @@ -96,7 +116,14 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
val expectedOpenInfoString = mapOf("message" to AttemptOpenStep.getSuccessMessage(indexName)).toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())),
listOf(
indexName to
listOf(
ManagedIndexMetaData.INFO to

fun(info: Any?): Boolean = expectedOpenInfoString == info.toString(),
),
),
getExplainMap(indexName),
strict = false,
)
Expand Down