From 65cd3df9a5b9f2c7f6c811a1d14521926bdef800 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Mon, 15 May 2023 11:42:16 +0200 Subject: [PATCH 1/2] Rollup Interceptor sourceIndex NPE fix (#773) * initial commit Signed-off-by: Petar Dzepina * detekt fix Signed-off-by: Petar Dzepina * few fixes Signed-off-by: Petar Dzepina * detekt override Signed-off-by: Petar Dzepina * Added test Signed-off-by: Petar Dzepina --------- Signed-off-by: Petar Dzepina (cherry picked from commit 4c60abd2d31f2738a08306944b61fcb53e6eb9a2) --- .../rollup/interceptor/RollupInterceptor.kt | 41 ++++- .../indexmanagement/util/IndexUtils.kt | 21 +-- .../rollup/interceptor/RollupInterceptorIT.kt | 170 +++++++++++++++++- 3 files changed, 203 insertions(+), 29 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 316451ac7..ffd1e4bd7 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -5,6 +5,9 @@ package org.opensearch.indexmanagement.rollup.interceptor +import org.apache.logging.log4j.LogManager +import org.opensearch.action.support.IndicesOptions +import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings @@ -55,6 +58,8 @@ class RollupInterceptor( val indexNameExpressionResolver: IndexNameExpressionResolver ) : TransportInterceptor { + private val logger = LogManager.getLogger(javaClass) + @Volatile private var searchEnabled = RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings) @Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings) @@ -92,7 +97,7 @@ class RollupInterceptor( ?: throw IllegalArgumentException("No rollup job associated with target_index") val queryFieldMappings = getQueryMetadata( request.source().query(), - IndexUtils.getConcreteIndex(rollupJob.sourceIndex, concreteIndices, clusterService.state()) + getConcreteSourceIndex(rollupJob.sourceIndex, indexNameExpressionResolver, clusterService.state()) ) val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) val fieldMappings = queryFieldMappings + aggregationFieldMappings @@ -109,6 +114,26 @@ class RollupInterceptor( } } } + + fun getConcreteSourceIndex(sourceIndex: String, resolver: IndexNameExpressionResolver, clusterState: ClusterState): String { + val concreteIndexNames = resolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN, sourceIndex) + if (concreteIndexNames.isEmpty()) { + logger.warn("Cannot resolve rollup sourceIndex [$sourceIndex]") + return "" + } + + var concreteIndexName: String = "" + if (concreteIndexNames.size == 1 && IndexUtils.isConcreteIndex(concreteIndexNames[0], clusterState)) { + concreteIndexName = concreteIndexNames[0] + } else if (concreteIndexNames.size > 1) { + concreteIndexName = IndexUtils.getNewestIndexByCreationDate(concreteIndexNames, clusterState) + } else if (IndexUtils.isAlias(sourceIndex, clusterState) || IndexUtils.isDataStream(sourceIndex, clusterState)) { + concreteIndexName = IndexUtils.getWriteIndex(sourceIndex, clusterState) + ?: IndexUtils.getNewestIndexByCreationDate(concreteIndexNames, clusterState) // + } + return concreteIndexName + } + /* * Validate that all indices have rollup job which matches field mappings from request * TODO return compiled list of issues here instead of just throwing exception @@ -168,16 +193,15 @@ class RollupInterceptor( return fieldMappings } - @Suppress("ComplexMethod", "ThrowsCount") + @Suppress("ComplexMethod", "ThrowsCount", "LongMethod") private fun getQueryMetadata( query: QueryBuilder?, - concreteSourceIndexName: String, + concreteSourceIndexName: String?, fieldMappings: MutableSet = mutableSetOf() ): Set { if (query == null) { return fieldMappings } - when (query) { is TermQueryBuilder -> { fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type)) @@ -218,6 +242,9 @@ class RollupInterceptor( fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type)) } is QueryStringQueryBuilder -> { + if (concreteSourceIndexName.isNullOrEmpty()) { + throw IllegalArgumentException("Can't parse query_string query without sourceIndex mappings!") + } // Throws IllegalArgumentException if unable to parse query val (queryFields, otherFields) = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName) for (field in queryFields) { @@ -231,7 +258,6 @@ class RollupInterceptor( throw IllegalArgumentException("The ${query.name} query is currently not supported in rollups") } } - return fieldMappings } @@ -302,10 +328,11 @@ class RollupInterceptor( private fun rewriteShardSearchForRollupJobs(request: ShardSearchRequest, matchingRollupJobs: Map>) { val matchedRollup = pickRollupJob(matchingRollupJobs.keys) val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType }) + val concreteSourceIndex = getConcreteSourceIndex(matchedRollup.sourceIndex, indexNameExpressionResolver, clusterService.state()) if (searchAllJobs) { - request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, matchedRollup.sourceIndex)) + request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex)) } else { - request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, matchedRollup.sourceIndex)) + request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex)) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt index 5c5af20f3..7edf0b613 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt @@ -15,9 +15,9 @@ import org.opensearch.cluster.metadata.IndexAbstraction import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.hash.MurmurHash3 import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentType import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser.Token -import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin import java.nio.ByteBuffer @@ -234,24 +234,5 @@ class IndexUtils { return clusterState.metadata .indicesLookup[indexName]!!.type == IndexAbstraction.Type.CONCRETE_INDEX } - - fun getConcreteIndex(indexName: String, concreteIndices: Array, clusterState: ClusterState): String { - - if (concreteIndices.isEmpty()) { - throw IllegalArgumentException("ConcreteIndices list can't be empty!") - } - - var concreteIndexName: String - if (concreteIndices.size == 1 && isConcreteIndex(indexName, clusterState)) { - concreteIndexName = indexName - } else if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) { - concreteIndexName = getWriteIndex(indexName, clusterState) - ?: getNewestIndexByCreationDate(concreteIndices, clusterState) // - } else { - concreteIndexName = getNewestIndexByCreationDate(concreteIndices, clusterState) - } - - return concreteIndexName - } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 316291db8..8af7a035b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -5,8 +5,9 @@ package org.opensearch.indexmanagement.rollup.interceptor -import org.apache.http.entity.ContentType -import org.apache.http.entity.StringEntity +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.io.entity.StringEntity +import org.junit.Assert import org.opensearch.client.ResponseException import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Terms @@ -1710,4 +1711,169 @@ class RollupInterceptorIT : RollupRestTestCase() { assertTrue("The query_string query field check failed!", e.message!!.contains("Could not find a rollup job that can answer this query because [missing field unknown_field]")) } } + + fun `test roll up search query_string query with index pattern as source`() { + val sourceIndex = "source_111_rollup_search_qsq_98243" + val targetIndex = "target_rollup_qsq_search_98243" + + createSampleIndexForQSQTest(sourceIndex) + + val rollup = Rollup( + id = "basic_query_string_query_rollup_search98243", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_111*", + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), + Terms("state", "state"), + Terms("state_ext", "state_ext"), + Terms("state_ext2", "state_ext2"), + Terms("state_ordinal", "state_ordinal"), + Terms("abc test", "abc test"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "earnings", targetField = "earnings", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 0", + "default_field": "state_ordinal" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + var rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + var rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + var rawAggRes = rawRes.asMap()["aggregations"] as Map> + var rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + } + + fun `test roll up search query_string query with index pattern as source deleted`() { + val sourceIndex = "source_999_rollup_search_qsq_982439" + val targetIndex = "target_rollup_qsq_search_982439" + + createSampleIndexForQSQTest(sourceIndex) + + val rollup = Rollup( + id = "basic_query_string_query_rollup_search982499", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_999*", + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), + Terms("state", "state"), + Terms("state_ext", "state_ext"), + Terms("state_ext2", "state_ext2"), + Terms("state_ordinal", "state_ordinal"), + Terms("abc test", "abc test"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "earnings", targetField = "earnings", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + deleteIndex(sourceIndex) + + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 0", + "default_field": "state_ordinal" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + try { + client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + fail("Failure was expected when searching rollup index using qsq query when sourceIndex does not exist!") + } catch (e: ResponseException) { + Assert.assertTrue(e.message!!.contains("Can't parse query_string query without sourceIndex mappings!")) + } + } } From c7d632a563d1c09fc4f552a1aa33c01199d4b024 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 16 May 2023 03:01:46 +0200 Subject: [PATCH 2/2] compile fix Signed-off-by: Petar Dzepina --- .../indexmanagement/rollup/interceptor/RollupInterceptorIT.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 8af7a035b..e42295418 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -5,8 +5,8 @@ package org.opensearch.indexmanagement.rollup.interceptor -import org.apache.hc.core5.http.ContentType -import org.apache.hc.core5.http.io.entity.StringEntity +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity import org.junit.Assert import org.opensearch.client.ResponseException import org.opensearch.indexmanagement.common.model.dimension.DateHistogram