Skip to content

[backport 2.x] Rollup Interceptor sourceIndex NPE fix (#773) #781

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.indexmanagement.rollup.interceptor

import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.IndicesOptions
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -55,6 +58,8 @@ class RollupInterceptor(
val indexNameExpressionResolver: IndexNameExpressionResolver
) : TransportInterceptor {

private val logger = LogManager.getLogger(javaClass)

@Volatile private var searchEnabled = RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings)
@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)

Expand Down Expand Up @@ -92,7 +97,7 @@ class RollupInterceptor(
?: throw IllegalArgumentException("No rollup job associated with target_index")
val queryFieldMappings = getQueryMetadata(
request.source().query(),
IndexUtils.getConcreteIndex(rollupJob.sourceIndex, concreteIndices, clusterService.state())
getConcreteSourceIndex(rollupJob.sourceIndex, indexNameExpressionResolver, clusterService.state())
)
val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories)
val fieldMappings = queryFieldMappings + aggregationFieldMappings
Expand All @@ -109,6 +114,26 @@ class RollupInterceptor(
}
}
}

fun getConcreteSourceIndex(sourceIndex: String, resolver: IndexNameExpressionResolver, clusterState: ClusterState): String {
val concreteIndexNames = resolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN, sourceIndex)
if (concreteIndexNames.isEmpty()) {
logger.warn("Cannot resolve rollup sourceIndex [$sourceIndex]")
return ""
}

var concreteIndexName: String = ""
if (concreteIndexNames.size == 1 && IndexUtils.isConcreteIndex(concreteIndexNames[0], clusterState)) {
concreteIndexName = concreteIndexNames[0]
} else if (concreteIndexNames.size > 1) {
concreteIndexName = IndexUtils.getNewestIndexByCreationDate(concreteIndexNames, clusterState)
} else if (IndexUtils.isAlias(sourceIndex, clusterState) || IndexUtils.isDataStream(sourceIndex, clusterState)) {
concreteIndexName = IndexUtils.getWriteIndex(sourceIndex, clusterState)
?: IndexUtils.getNewestIndexByCreationDate(concreteIndexNames, clusterState) //
}
return concreteIndexName
}

/*
* Validate that all indices have rollup job which matches field mappings from request
* TODO return compiled list of issues here instead of just throwing exception
Expand Down Expand Up @@ -168,16 +193,15 @@ class RollupInterceptor(
return fieldMappings
}

@Suppress("ComplexMethod", "ThrowsCount")
@Suppress("ComplexMethod", "ThrowsCount", "LongMethod")
private fun getQueryMetadata(
query: QueryBuilder?,
concreteSourceIndexName: String,
concreteSourceIndexName: String?,
fieldMappings: MutableSet<RollupFieldMapping> = mutableSetOf()
): Set<RollupFieldMapping> {
if (query == null) {
return fieldMappings
}

when (query) {
is TermQueryBuilder -> {
fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type))
Expand Down Expand Up @@ -218,6 +242,9 @@ class RollupInterceptor(
fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type))
}
is QueryStringQueryBuilder -> {
if (concreteSourceIndexName.isNullOrEmpty()) {
throw IllegalArgumentException("Can't parse query_string query without sourceIndex mappings!")
}
// Throws IllegalArgumentException if unable to parse query
val (queryFields, otherFields) = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName)
for (field in queryFields) {
Expand All @@ -231,7 +258,6 @@ class RollupInterceptor(
throw IllegalArgumentException("The ${query.name} query is currently not supported in rollups")
}
}

return fieldMappings
}

Expand Down Expand Up @@ -302,10 +328,11 @@ class RollupInterceptor(
private fun rewriteShardSearchForRollupJobs(request: ShardSearchRequest, matchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>>) {
val matchedRollup = pickRollupJob(matchingRollupJobs.keys)
val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType })
val concreteSourceIndex = getConcreteSourceIndex(matchedRollup.sourceIndex, indexNameExpressionResolver, clusterService.state())
if (searchAllJobs) {
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, matchedRollup.sourceIndex))
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex))
} else {
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, matchedRollup.sourceIndex))
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.hash.MurmurHash3
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin
import java.nio.ByteBuffer
Expand Down Expand Up @@ -234,24 +234,5 @@ class IndexUtils {
return clusterState.metadata
.indicesLookup[indexName]!!.type == IndexAbstraction.Type.CONCRETE_INDEX
}

fun getConcreteIndex(indexName: String, concreteIndices: Array<String>, clusterState: ClusterState): String {

if (concreteIndices.isEmpty()) {
throw IllegalArgumentException("ConcreteIndices list can't be empty!")
}

var concreteIndexName: String
if (concreteIndices.size == 1 && isConcreteIndex(indexName, clusterState)) {
concreteIndexName = indexName
} else if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) {
concreteIndexName = getWriteIndex(indexName, clusterState)
?: getNewestIndexByCreationDate(concreteIndices, clusterState) //
} else {
concreteIndexName = getNewestIndexByCreationDate(concreteIndices, clusterState)
}

return concreteIndexName
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.rollup.interceptor

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.junit.Assert
import org.opensearch.client.ResponseException
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Terms
Expand Down Expand Up @@ -1710,4 +1711,169 @@ class RollupInterceptorIT : RollupRestTestCase() {
assertTrue("The query_string query field check failed!", e.message!!.contains("Could not find a rollup job that can answer this query because [missing field unknown_field]"))
}
}

fun `test roll up search query_string query with index pattern as source`() {
val sourceIndex = "source_111_rollup_search_qsq_98243"
val targetIndex = "target_rollup_qsq_search_98243"

createSampleIndexForQSQTest(sourceIndex)

val rollup = Rollup(
id = "basic_query_string_query_rollup_search98243",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_111*",
targetIndex = targetIndex,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "event_ts", fixedInterval = "1h"),
Terms("state", "state"),
Terms("state_ext", "state_ext"),
Terms("state_ext2", "state_ext2"),
Terms("state_ordinal", "state_ordinal"),
Terms("abc test", "abc test"),
),
metrics = listOf(
RollupMetrics(
sourceField = "earnings", targetField = "earnings",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
)
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

// Term query
var req = """
{
"size": 0,
"query": {
"query_string": {
"query": "state:TX AND state_ext:CA AND 0",
"default_field": "state_ordinal"
}

},
"aggs": {
"earnings_total": {
"sum": {
"field": "earnings"
}
}
}
}
""".trimIndent()
var rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes.restStatus() == RestStatus.OK)
var rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rollupRes.restStatus() == RestStatus.OK)
var rawAggRes = rawRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
var rollupAggRes = rollupRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
assertEquals(
"Source and rollup index did not return same min results",
rawAggRes.getValue("earnings_total")["value"],
rollupAggRes.getValue("earnings_total")["value"]
)
}

fun `test roll up search query_string query with index pattern as source deleted`() {
val sourceIndex = "source_999_rollup_search_qsq_982439"
val targetIndex = "target_rollup_qsq_search_982439"

createSampleIndexForQSQTest(sourceIndex)

val rollup = Rollup(
id = "basic_query_string_query_rollup_search982499",
enabled = true,
schemaVersion = 1L,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_999*",
targetIndex = targetIndex,
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "event_ts", fixedInterval = "1h"),
Terms("state", "state"),
Terms("state_ext", "state_ext"),
Terms("state_ext2", "state_ext2"),
Terms("state_ordinal", "state_ordinal"),
Terms("abc test", "abc test"),
),
metrics = listOf(
RollupMetrics(
sourceField = "earnings", targetField = "earnings",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
)
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
}

refreshAllIndices()

deleteIndex(sourceIndex)

// Term query
var req = """
{
"size": 0,
"query": {
"query_string": {
"query": "state:TX AND state_ext:CA AND 0",
"default_field": "state_ordinal"
}

},
"aggs": {
"earnings_total": {
"sum": {
"field": "earnings"
}
}
}
}
""".trimIndent()
try {
client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
fail("Failure was expected when searching rollup index using qsq query when sourceIndex does not exist!")
} catch (e: ResponseException) {
Assert.assertTrue(e.message!!.contains("Can't parse query_string query without sourceIndex mappings!"))
}
}
}