Skip to content

Commit db0425d

Browse files
authored
Adding support for multi-tier rollups in ISM (opensearch-project#1533)
* Adding support for multi-tier rollups in ISM Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Using flag in place of thread context for internal search calls Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Removing some unwanted code Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Add support for source index in ISM Policy Rollup Action to help perform multi-tier rollups Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Upgrading the schema in the cached config file Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Removing some redundant comments Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Addressing comments Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Build failures Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Creating constant variable for bypass string Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Adding check for empty indices Signed-off-by: Kshitij Tandon <tandonks@amazon.com> * Fixing MapperService tests Signed-off-by: Kshitij Tandon <tandonks@amazon.com> --------- Signed-off-by: Kshitij Tandon <tandonks@amazon.com>
1 parent 4f007f2 commit db0425d

23 files changed

Lines changed: 3685 additions & 68 deletions

src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ class IndexManagementPlugin :
436436
.registerMapperService(RollupMapperService(client, clusterService, indexNameExpressionResolver))
437437
.registerIndexer(RollupIndexer(settings, clusterService, client))
438438
.registerSearcher(RollupSearchService(settings, clusterService, client))
439-
.registerMetadataServices(RollupMetadataService(client, xContentRegistry))
439+
.registerMetadataServices(RollupMetadataService(client, xContentRegistry, clusterService))
440440
.registerConsumers()
441441
.registerClusterConfigurationProvider(skipFlag)
442442
indexManagementIndices = IndexManagementIndices(settings, client.admin().indices(), clusterService)

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import org.opensearch.indexmanagement.rollup.action.index.IndexRollupRequest
1818
import org.opensearch.indexmanagement.rollup.action.index.IndexRollupResponse
1919
import org.opensearch.indexmanagement.rollup.action.start.StartRollupAction
2020
import org.opensearch.indexmanagement.rollup.action.start.StartRollupRequest
21+
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
2122
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
2223
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
2324
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
@@ -39,39 +40,100 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
3940
val previousRunRollupId = managedIndexMetadata.actionMetaData?.actionProperties?.rollupId
4041
val hasPreviousRollupAttemptFailed = managedIndexMetadata.actionMetaData?.actionProperties?.hasRollupFailed
4142

42-
// Creating a rollup job
43-
val rollup = action.ismRollup.toRollup(indexName, context.user)
44-
rollupId = rollup.id
45-
logger.info("Attempting to create a rollup job $rollupId for index $indexName")
46-
47-
val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE)
48-
4943
try {
44+
// Create a temporary rollup object for template resolution context.
45+
// This provides the rollup's source_index as {{ctx.source_index}} in templates.
46+
val tempRollup = action.ismRollup.toRollup(indexName, context.user)
47+
48+
// Resolve source_index template if provided, else use managed index name.
49+
// This enables patterns like:
50+
// - source_index: "{{ctx.index}}" -> resolves to the managed index name
51+
// - source_index: null -> defaults to the managed index name (backward compatible)
52+
val resolvedSourceIndex = if (action.ismRollup.sourceIndex != null) {
53+
RollupFieldValueExpressionResolver.resolve(
54+
tempRollup,
55+
action.ismRollup.sourceIndex,
56+
indexName,
57+
)
58+
} else {
59+
indexName
60+
}
61+
62+
// Resolve target_index template.
63+
val resolvedTargetIndex = RollupFieldValueExpressionResolver.resolve(
64+
tempRollup,
65+
action.ismRollup.targetIndex,
66+
indexName,
67+
)
68+
69+
// Validate resolved indices to ensure they are valid and different.
70+
// This catches configuration errors early before attempting to create the rollup job.
71+
validateResolvedIndices(resolvedSourceIndex, resolvedTargetIndex)
72+
73+
logger.info(
74+
"Executing rollup from source [$resolvedSourceIndex] to target [$resolvedTargetIndex] " +
75+
"for managed index [$indexName]",
76+
)
77+
78+
// Create the final rollup job with resolved source_index and target_index.
79+
val rollup = action.ismRollup.toRollup(indexName, context.user)
80+
.copy(sourceIndex = resolvedSourceIndex, targetIndex = resolvedTargetIndex)
81+
rollupId = rollup.id
82+
logger.info("Attempting to create a rollup job $rollupId for index $indexName")
83+
84+
val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE)
5085
val response: IndexRollupResponse = context.client.suspendUntil { execute(IndexRollupAction.INSTANCE, indexRollupRequest, it) }
5186
logger.info("Received status ${response.status.status} on trying to create rollup job $rollupId")
5287

5388
stepStatus = StepStatus.COMPLETED
5489
info = mapOf("message" to getSuccessMessage(rollup.id, indexName))
90+
} catch (e: IllegalArgumentException) {
91+
val message = "Failed to validate resolved indices for rollup job"
92+
logger.error(message, e)
93+
stepStatus = StepStatus.FAILED
94+
info = mapOf("message" to message, "cause" to "${e.message}")
5595
} catch (e: VersionConflictEngineException) {
56-
val message = getFailedJobExistsMessage(rollup.id, indexName)
96+
val message = getFailedJobExistsMessage(rollupId ?: "unknown", indexName)
5797
logger.info(message)
5898
if (rollupId == previousRunRollupId && hasPreviousRollupAttemptFailed == true) {
59-
startRollupJob(rollup.id, context)
99+
startRollupJob(rollupId ?: "unknown", context)
60100
} else {
61101
stepStatus = StepStatus.COMPLETED
62102
info = mapOf("info" to message)
63103
}
64104
} catch (e: RemoteTransportException) {
65-
processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
105+
processFailure(rollupId ?: "unknown", indexName, ExceptionsHelper.unwrapCause(e) as Exception)
66106
} catch (e: OpenSearchException) {
67-
processFailure(rollup.id, indexName, e)
107+
processFailure(rollupId ?: "unknown", indexName, e)
68108
} catch (e: Exception) {
69-
processFailure(rollup.id, indexName, e)
109+
val message = "Failed to create rollup job"
110+
logger.error(message, e)
111+
stepStatus = StepStatus.FAILED
112+
info = mapOf("message" to message, "cause" to "${e.message}")
70113
}
71114

72115
return this
73116
}
74117

118+
/**
119+
* Validates that resolved source and target indices are valid and different.
120+
*
121+
* @param sourceIndex The resolved source index name (after template resolution)
122+
* @param targetIndex The resolved target index name (after template resolution)
123+
* @throws IllegalArgumentException if any validation rule fails, with a descriptive error message
124+
*/
125+
private fun validateResolvedIndices(sourceIndex: String, targetIndex: String) {
126+
require(sourceIndex.isNotBlank()) {
127+
"Resolved source_index cannot be empty"
128+
}
129+
require(targetIndex.isNotBlank()) {
130+
"Resolved target_index cannot be empty"
131+
}
132+
require(sourceIndex != targetIndex) {
133+
"Source and target indices must be different: $sourceIndex"
134+
}
135+
}
136+
75137
fun processFailure(rollupId: String, indexName: String, e: Exception) {
76138
val message = getFailedMessage(rollupId, indexName)
77139
logger.error(message, e)

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,12 @@ class RollupMapperService(
297297

298298
val indexMappingSource = indexTypeMappings.sourceAsMap
299299

300+
// If source is a rollup index with no properties (no data rolled up yet), skip field validation
301+
if (isRollupIndex(index, clusterService.state()) && !indexMappingSource.containsKey("properties")) {
302+
logger.info("Source rollup index [$index] has no properties yet, skipping field validation")
303+
return RollupJobValidationResult.Valid
304+
}
305+
300306
val issues = mutableSetOf<String>()
301307
// Validate source fields in dimensions
302308
rollup.dimensions.forEach { dimension ->

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@ import org.opensearch.indexmanagement.IndexManagementPlugin
3131
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
3232
import org.opensearch.indexmanagement.opensearchapi.parseWithType
3333
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
34+
import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor
3435
import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata
3536
import org.opensearch.indexmanagement.rollup.model.Rollup
3637
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
3738
import org.opensearch.indexmanagement.rollup.model.RollupStats
3839
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT
40+
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
3941
import org.opensearch.indexmanagement.util.NO_ID
4042
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
4143
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
4244
import org.opensearch.search.builder.SearchSourceBuilder
45+
import org.opensearch.search.fetch.subphase.FetchSourceContext
4346
import org.opensearch.search.sort.SortOrder
4447
import org.opensearch.transport.RemoteTransportException
4548
import org.opensearch.transport.client.Client
@@ -48,7 +51,11 @@ import java.time.Instant
4851
// TODO: Wrap client calls in retry for transient failures
4952
// Service that handles CRUD operations for rollup metadata
5053
@Suppress("TooManyFunctions")
51-
class RollupMetadataService(val client: Client, val xContentRegistry: NamedXContentRegistry) {
54+
class RollupMetadataService(
55+
val client: Client,
56+
val xContentRegistry: NamedXContentRegistry,
57+
val clusterService: org.opensearch.cluster.service.ClusterService,
58+
) {
5259
private val logger = LogManager.getLogger(javaClass)
5360

5461
// If the job does not have a metadataID then we need to initialize the first metadata
@@ -186,6 +193,12 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
186193
@Throws(Exception::class)
187194
private suspend fun getInitialStartTime(rollup: Rollup): StartingTimeResult {
188195
try {
196+
// Check if source is a rollup index and use appropriate method
197+
val isSourceRollupIndex = isRollupIndex(rollup.sourceIndex, clusterService.state())
198+
if (isSourceRollupIndex) {
199+
// Use min aggregation for rollup indices (RollupInterceptor blocks size > 0)
200+
return getEarliestTimestampFromRollupIndex(rollup)
201+
}
189202
// Rollup requires the first dimension to be the date histogram
190203
val dateHistogram = rollup.dimensions.first() as DateHistogram
191204
val searchSourceBuilder =
@@ -227,6 +240,60 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
227240
}
228241
}
229242

243+
/**
244+
* Get the earliest timestamp from a rollup index by finding the minimum value of the date histogram field.
245+
* This is used to determine the starting point for continuous rollups on rollup indices.
246+
* Uses sort instead of aggregation to avoid rollup interceptor validation.
247+
*/
248+
@Suppress("ReturnCount")
249+
@Throws(Exception::class)
250+
private suspend fun getEarliestTimestampFromRollupIndex(rollup: Rollup): StartingTimeResult {
251+
try {
252+
val dateHistogram = rollup.dimensions.first() as DateHistogram
253+
val dateField = dateHistogram.sourceField
254+
255+
// For multi-tier rollup, we would be querying a document on a rollup index
256+
// So we set this bypassMarker in fetchSource as a flag to help bypass the validation in RollupInterceptor
257+
val bypassMarker = "${RollupInterceptor.BYPASS_MARKER_PREFIX}${RollupInterceptor.BYPASS_SIZE_CHECK}"
258+
259+
val searchRequest = SearchRequest(rollup.sourceIndex)
260+
.source(
261+
SearchSourceBuilder()
262+
.size(1)
263+
.query(MatchAllQueryBuilder())
264+
.sort("$dateField.date_histogram", SortOrder.ASC)
265+
.trackTotalHits(false)
266+
.fetchSource(FetchSourceContext(false, arrayOf(bypassMarker), emptyArray()))
267+
.docValueField("$dateField.date_histogram", DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT),
268+
)
269+
.allowPartialSearchResults(false)
270+
271+
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
272+
273+
if (response.hits.hits.isEmpty()) {
274+
return StartingTimeResult.NoDocumentsFound
275+
}
276+
277+
// In rollup indices, date histogram fields are named as "field.date_histogram"
278+
val rollupDateField = "$dateField.date_histogram"
279+
val firstHitTimestampAsString: String =
280+
response.hits.hits.first().field(rollupDateField).getValue<String>()
281+
?: return StartingTimeResult.NoDocumentsFound
282+
283+
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
284+
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
285+
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
286+
} catch (e: RemoteTransportException) {
287+
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
288+
logger.error("Error when getting earliest timestamp from rollup index for rollup [{}]: {}", rollup.id, unwrappedException)
289+
return StartingTimeResult.Failure(unwrappedException)
290+
} catch (e: Exception) {
291+
// TODO: Catching general exceptions for now, can make more granular
292+
logger.error("Error when getting earliest timestamp from rollup index for rollup [{}]: {}", rollup.id, e)
293+
return StartingTimeResult.Failure(e)
294+
}
295+
}
296+
230297
/**
231298
* Return time rounded down to the nearest unit of time the interval is based on.
232299
* This should map to the equivalent bucket a document with the given timestamp would fall into for the date histogram.

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupRunner.kt

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupStats
3535
import org.opensearch.indexmanagement.rollup.model.incrementStats
3636
import org.opensearch.indexmanagement.rollup.model.mergeStats
3737
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
38+
import org.opensearch.indexmanagement.rollup.util.getDateHistogram
39+
import org.opensearch.indexmanagement.rollup.util.getRollupJobs
40+
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
3841
import org.opensearch.indexmanagement.util.acquireLockForScheduledJob
3942
import org.opensearch.indexmanagement.util.releaseLockForScheduledJob
4043
import org.opensearch.indexmanagement.util.renewLockForScheduledJob
@@ -44,6 +47,8 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter
4447
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
4548
import org.opensearch.script.ScriptService
4649
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
50+
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
51+
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval
4752
import org.opensearch.threadpool.ThreadPool
4853
import org.opensearch.transport.client.Client
4954

@@ -277,7 +282,7 @@ object RollupRunner :
277282
withClosableContext(
278283
IndexManagementSecurityContext(job.id, settings, threadPool.threadContext, job.user),
279284
) {
280-
rollupSearchService.executeCompositeSearch(updatableJob, metadata)
285+
rollupSearchService.executeCompositeSearch(updatableJob, metadata, clusterService)
281286
}
282287
val rollupResult =
283288
when (rollupSearchResult) {
@@ -431,6 +436,17 @@ object RollupRunner :
431436
else -> return@withClosableContext sourceIndexValidationResult
432437
}
433438

439+
// Additional validation for rollup-on-rollup scenarios
440+
if (isRollupIndex(job.sourceIndex, clusterService.state())) {
441+
when (val rollupValidationResult = validateRollupOnRollup(job)) {
442+
is RollupJobValidationResult.Valid -> {
443+
}
444+
445+
// No action taken when valid
446+
else -> return@withClosableContext rollupValidationResult
447+
}
448+
}
449+
434450
// we validate target index only if there is metadata document in the rollup
435451
if (metadata != null) {
436452
logger.debug("Attempting to create/validate target index [${job.targetIndex}] for rollup job [${job.id}]")
@@ -492,6 +508,76 @@ object RollupRunner :
492508
}
493509
}
494510
}
511+
512+
@Suppress("ReturnCount")
513+
private fun validateRollupOnRollup(job: Rollup): RollupJobValidationResult {
514+
val sourceRollupJobs = clusterService.state().metadata.index(job.sourceIndex).getRollupJobs()
515+
if (sourceRollupJobs == null) {
516+
return RollupJobValidationResult.Invalid("Source rollup index has no rollup jobs")
517+
}
518+
519+
val sourceJob = sourceRollupJobs.first()
520+
val targetDateHistogram = job.getDateHistogram()
521+
val sourceDateHistogram = sourceJob.getDateHistogram()
522+
523+
// Validate interval alignment
524+
val sourceInterval = sourceDateHistogram.fixedInterval ?: sourceDateHistogram.calendarInterval!!
525+
val targetInterval = targetDateHistogram.fixedInterval ?: targetDateHistogram.calendarInterval!!
526+
val intervalValid = validateIntervalAlignment(sourceInterval, targetInterval)
527+
if (!intervalValid) {
528+
return RollupJobValidationResult.Invalid(
529+
"Target interval [$targetInterval] must be an exact multiple of source interval [$sourceInterval]",
530+
)
531+
}
532+
533+
// Validate source field compatibility
534+
val sourceDimensionFields = sourceJob.dimensions.map { it.sourceField }.toSet()
535+
val sourceMetricFields = sourceJob.metrics.map { it.sourceField }.toSet()
536+
val targetDimensionFields = job.dimensions.map { it.sourceField }.toSet()
537+
val targetMetricFields = job.metrics.map { it.sourceField }.toSet()
538+
539+
val invalidDimensionFields = targetDimensionFields - sourceDimensionFields
540+
val invalidMetricFields = targetMetricFields - sourceMetricFields
541+
542+
return when {
543+
invalidDimensionFields.isNotEmpty() -> RollupJobValidationResult.Invalid(
544+
"Cannot rollup on dimension fields $invalidDimensionFields that don't exist in source rollup",
545+
)
546+
547+
invalidMetricFields.isNotEmpty() -> RollupJobValidationResult.Invalid(
548+
"Cannot rollup on metric fields $invalidMetricFields that don't exist in source rollup",
549+
)
550+
551+
else -> {
552+
// Validate metric compatibility
553+
val sourceMetrics = sourceJob.metrics.flatMap { it.metrics }.map { it.type.type }.toSet()
554+
val targetMetrics = job.metrics.flatMap { it.metrics }.map { it.type.type }.toSet()
555+
val unsupportedMetrics = targetMetrics - sourceMetrics
556+
557+
if (unsupportedMetrics.isNotEmpty()) {
558+
RollupJobValidationResult.Invalid(
559+
"Target rollup requests metrics $unsupportedMetrics that are not available in source rollup",
560+
)
561+
} else {
562+
RollupJobValidationResult.Valid
563+
}
564+
}
565+
}
566+
}
567+
568+
private fun validateIntervalAlignment(sourceInterval: String, targetInterval: String): Boolean = try {
569+
val sourceMillis = parseIntervalToMillis(sourceInterval)
570+
val targetMillis = parseIntervalToMillis(targetInterval)
571+
targetMillis % sourceMillis == 0L && targetMillis > sourceMillis
572+
} catch (e: Exception) {
573+
true // Let it through and fail later with better error
574+
}
575+
576+
private fun parseIntervalToMillis(interval: String): Long = if (DateHistogramAggregationBuilder.DATE_FIELD_UNITS.containsKey(interval)) {
577+
DateHistogramInterval(interval).estimateMillis()
578+
} else {
579+
TimeValue.parseTimeValue(interval, "parseIntervalToMillis").millis
580+
}
495581
}
496582

497583
sealed class RollupJobResult {

0 commit comments

Comments
 (0)