Skip to content

Commit bd0a742

Browse files
harshitakaushik-devHarshita Kaushik
and
Harshita Kaushik
committed
Step Metadata Update on Index Rollover Timeout (#1174)
Co-authored-by: Harshita Kaushik <[email protected]> Signed-off-by: harycash <[email protected]>
1 parent 59a4fe6 commit bd0a742

File tree

3 files changed

+56
-22
lines changed
  • spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement
  • src

3 files changed

+56
-22
lines changed

spi/src/main/kotlin/org.opensearch.indexmanagement.spi/indexstatemanagement/Step.kt

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ abstract class Step(val name: String, val isSafeToDisableOn: Boolean = true) {
5757
CONDITION_NOT_MET("condition_not_met"),
5858
FAILED("failed"),
5959
COMPLETED("completed"),
60+
TIMED_OUT("timed_out"),
6061
;
6162

6263
override fun toString(): String {

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt

+10-4
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedInde
9393
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.PolicyRetryInfoMetaData
9494
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StateMetaData
9595
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext
96+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
9697
import org.opensearch.jobscheduler.spi.JobExecutionContext
9798
import org.opensearch.jobscheduler.spi.LockModel
9899
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
@@ -339,13 +340,18 @@ object ManagedIndexRunner :
339340
if (action?.hasTimedOut(currentActionMetaData) == true) {
340341
val info = mapOf("message" to "Action timed out")
341342
logger.error("Action=${action.type} has timed out")
342-
val updated = updateManagedIndexMetaData(
343-
managedIndexMetaData
344-
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info),
343+
344+
val updatedIndexMetaData = managedIndexMetaData.copy(
345+
actionMetaData = currentActionMetaData?.copy(failed = true),
346+
stepMetaData = step?.let { StepMetaData(it.name, System.currentTimeMillis(), Step.StepStatus.TIMED_OUT) },
347+
info = info,
345348
)
349+
350+
val updated = updateManagedIndexMetaData(updatedIndexMetaData)
351+
346352
if (updated.metadataSaved) {
347353
disableManagedIndexConfig(managedIndexConfig)
348-
publishErrorNotification(policy, managedIndexMetaData)
354+
publishErrorNotification(policy, updatedIndexMetaData)
349355
}
350356
return
351357
}

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionTimeoutIT.kt

+45-18
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ package org.opensearch.indexmanagement.indexstatemanagement.action
88
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
99
import org.opensearch.indexmanagement.indexstatemanagement.step.open.AttemptOpenStep
1010
import org.opensearch.indexmanagement.indexstatemanagement.step.rollover.AttemptRolloverStep
11+
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
1112
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData
1213
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
14+
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
1315
import org.opensearch.indexmanagement.waitFor
1416
import java.time.Instant
1517
import java.util.Locale
@@ -20,11 +22,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
2022
fun `test failed action`() {
2123
val indexName = "${testIndexName}_index_1"
2224
val policyID = "${testIndexName}_testPolicyName_1"
23-
val testPolicy = """
25+
val testPolicy =
26+
"""
2427
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
2528
{"name":"rolloverstate","actions":[{"timeout":"1s","rollover":{"min_doc_count":100}}],
2629
"transitions":[]}]}}
27-
""".trimIndent()
30+
""".trimIndent()
2831

2932
createPolicyJson(testPolicy, policyID)
3033

@@ -52,16 +55,32 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
5255
waitFor {
5356
assertPredicatesOnMetaData(
5457
listOf(
55-
indexName to listOf(
56-
ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean =
57-
assertActionEquals(
58-
ActionMetaData(
59-
name = RolloverAction.name, startTime = Instant.now().toEpochMilli(), index = 0,
60-
failed = true, consumedRetries = 0, lastRetryTime = null, actionProperties = null,
61-
),
62-
actionMetaDataMap,
63-
),
64-
),
58+
indexName to
59+
listOf(
60+
ActionMetaData.ACTION to
61+
62+
fun(actionMetaDataMap: Any?): Boolean =
63+
assertActionEquals(
64+
ActionMetaData(
65+
name = RolloverAction.name,
66+
startTime = Instant.now().toEpochMilli(),
67+
index = 0,
68+
failed = true,
69+
consumedRetries = 0,
70+
lastRetryTime = null,
71+
actionProperties = null,
72+
),
73+
actionMetaDataMap,
74+
),
75+
StepMetaData.STEP to
76+
fun(stepMetaDataMap: Any?): Boolean =
77+
assertStepEquals(
78+
StepMetaData(
79+
"attempt_rollover", Instant.now().toEpochMilli(), Step.StepStatus.TIMED_OUT,
80+
),
81+
stepMetaDataMap,
82+
),
83+
),
6584
),
6685
getExplainMap(indexName),
6786
strict = false,
@@ -73,11 +92,12 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
7392
fun `test action timeout doesn't bleed over into next action`() {
7493
val indexName = "${testIndexName}_index_2"
7594
val policyID = "${testIndexName}_testPolicyName_2"
76-
val testPolicy = """
77-
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
78-
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
79-
"transitions":[]}]}}
80-
""".trimIndent()
95+
val testPolicy =
96+
"""
97+
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
98+
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
99+
"transitions":[]}]}}
100+
""".trimIndent()
81101

82102
createPolicyJson(testPolicy, policyID)
83103

@@ -96,7 +116,14 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
96116
val expectedOpenInfoString = mapOf("message" to AttemptOpenStep.getSuccessMessage(indexName)).toString()
97117
waitFor {
98118
assertPredicatesOnMetaData(
99-
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())),
119+
listOf(
120+
indexName to
121+
listOf(
122+
ManagedIndexMetaData.INFO to
123+
124+
fun(info: Any?): Boolean = expectedOpenInfoString == info.toString(),
125+
),
126+
),
100127
getExplainMap(indexName),
101128
strict = false,
102129
)

0 commit comments

Comments
 (0)