Skip to content

Commit dc4f9cf

Browse files
petardzronnaksaxena
authored andcommitted
[backport 2.x] Rollup Interceptor sourceIndex NPE fix (opensearch-project#773) (opensearch-project#781)
* Rollup Interceptor sourceIndex NPE fix (opensearch-project#773) * initial commit Signed-off-by: Petar Dzepina <[email protected]> * detekt fix Signed-off-by: Petar Dzepina <[email protected]> * few fixes Signed-off-by: Petar Dzepina <[email protected]> * detekt override Signed-off-by: Petar Dzepina <[email protected]> * Added test Signed-off-by: Petar Dzepina <[email protected]> --------- Signed-off-by: Petar Dzepina <[email protected]> (cherry picked from commit 4c60abd) * compile fix Signed-off-by: Petar Dzepina <[email protected]> --------- Signed-off-by: Petar Dzepina <[email protected]> Signed-off-by: Ronnak Saxena <[email protected]>
1 parent 80b470d commit dc4f9cf

File tree

3 files changed

+201
-27
lines changed

3 files changed

+201
-27
lines changed

src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt

+34-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55

66
package org.opensearch.indexmanagement.rollup.interceptor
77

8+
import org.apache.logging.log4j.LogManager
9+
import org.opensearch.action.support.IndicesOptions
10+
import org.opensearch.cluster.ClusterState
811
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
912
import org.opensearch.cluster.service.ClusterService
1013
import org.opensearch.common.settings.Settings
@@ -55,6 +58,8 @@ class RollupInterceptor(
5558
val indexNameExpressionResolver: IndexNameExpressionResolver
5659
) : TransportInterceptor {
5760

61+
private val logger = LogManager.getLogger(javaClass)
62+
5863
@Volatile private var searchEnabled = RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings)
5964
@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)
6065

@@ -92,7 +97,7 @@ class RollupInterceptor(
9297
?: throw IllegalArgumentException("No rollup job associated with target_index")
9398
val queryFieldMappings = getQueryMetadata(
9499
request.source().query(),
95-
IndexUtils.getConcreteIndex(rollupJob.sourceIndex, concreteIndices, clusterService.state())
100+
getConcreteSourceIndex(rollupJob.sourceIndex, indexNameExpressionResolver, clusterService.state())
96101
)
97102
val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories)
98103
val fieldMappings = queryFieldMappings + aggregationFieldMappings
@@ -109,6 +114,26 @@ class RollupInterceptor(
109114
}
110115
}
111116
}
117+
118+
fun getConcreteSourceIndex(sourceIndex: String, resolver: IndexNameExpressionResolver, clusterState: ClusterState): String {
119+
val concreteIndexNames = resolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN, sourceIndex)
120+
if (concreteIndexNames.isEmpty()) {
121+
logger.warn("Cannot resolve rollup sourceIndex [$sourceIndex]")
122+
return ""
123+
}
124+
125+
var concreteIndexName: String = ""
126+
if (concreteIndexNames.size == 1 && IndexUtils.isConcreteIndex(concreteIndexNames[0], clusterState)) {
127+
concreteIndexName = concreteIndexNames[0]
128+
} else if (concreteIndexNames.size > 1) {
129+
concreteIndexName = IndexUtils.getNewestIndexByCreationDate(concreteIndexNames, clusterState)
130+
} else if (IndexUtils.isAlias(sourceIndex, clusterState) || IndexUtils.isDataStream(sourceIndex, clusterState)) {
131+
concreteIndexName = IndexUtils.getWriteIndex(sourceIndex, clusterState)
132+
?: IndexUtils.getNewestIndexByCreationDate(concreteIndexNames, clusterState) //
133+
}
134+
return concreteIndexName
135+
}
136+
112137
/*
113138
* Validate that all indices have rollup job which matches field mappings from request
114139
* TODO return compiled list of issues here instead of just throwing exception
@@ -168,16 +193,15 @@ class RollupInterceptor(
168193
return fieldMappings
169194
}
170195

171-
@Suppress("ComplexMethod", "ThrowsCount")
196+
@Suppress("ComplexMethod", "ThrowsCount", "LongMethod")
172197
private fun getQueryMetadata(
173198
query: QueryBuilder?,
174-
concreteSourceIndexName: String,
199+
concreteSourceIndexName: String?,
175200
fieldMappings: MutableSet<RollupFieldMapping> = mutableSetOf()
176201
): Set<RollupFieldMapping> {
177202
if (query == null) {
178203
return fieldMappings
179204
}
180-
181205
when (query) {
182206
is TermQueryBuilder -> {
183207
fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type))
@@ -218,6 +242,9 @@ class RollupInterceptor(
218242
fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type))
219243
}
220244
is QueryStringQueryBuilder -> {
245+
if (concreteSourceIndexName.isNullOrEmpty()) {
246+
throw IllegalArgumentException("Can't parse query_string query without sourceIndex mappings!")
247+
}
221248
// Throws IllegalArgumentException if unable to parse query
222249
val (queryFields, otherFields) = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName)
223250
for (field in queryFields) {
@@ -231,7 +258,6 @@ class RollupInterceptor(
231258
throw IllegalArgumentException("The ${query.name} query is currently not supported in rollups")
232259
}
233260
}
234-
235261
return fieldMappings
236262
}
237263

@@ -302,10 +328,11 @@ class RollupInterceptor(
302328
private fun rewriteShardSearchForRollupJobs(request: ShardSearchRequest, matchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>>) {
303329
val matchedRollup = pickRollupJob(matchingRollupJobs.keys)
304330
val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType })
331+
val concreteSourceIndex = getConcreteSourceIndex(matchedRollup.sourceIndex, indexNameExpressionResolver, clusterService.state())
305332
if (searchAllJobs) {
306-
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, matchedRollup.sourceIndex))
333+
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex))
307334
} else {
308-
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, matchedRollup.sourceIndex))
335+
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex))
309336
}
310337
}
311338
}

src/main/kotlin/org/opensearch/indexmanagement/util/IndexUtils.kt

+1-20
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import org.opensearch.cluster.metadata.IndexAbstraction
1515
import org.opensearch.cluster.metadata.IndexMetadata
1616
import org.opensearch.common.hash.MurmurHash3
1717
import org.opensearch.common.xcontent.LoggingDeprecationHandler
18+
import org.opensearch.common.xcontent.XContentType
1819
import org.opensearch.core.xcontent.NamedXContentRegistry
1920
import org.opensearch.core.xcontent.XContentParser.Token
20-
import org.opensearch.common.xcontent.XContentType
2121
import org.opensearch.indexmanagement.IndexManagementIndices
2222
import org.opensearch.indexmanagement.IndexManagementPlugin
2323
import java.nio.ByteBuffer
@@ -234,24 +234,5 @@ class IndexUtils {
234234
return clusterState.metadata
235235
.indicesLookup[indexName]!!.type == IndexAbstraction.Type.CONCRETE_INDEX
236236
}
237-
238-
fun getConcreteIndex(indexName: String, concreteIndices: Array<String>, clusterState: ClusterState): String {
239-
240-
if (concreteIndices.isEmpty()) {
241-
throw IllegalArgumentException("ConcreteIndices list can't be empty!")
242-
}
243-
244-
var concreteIndexName: String
245-
if (concreteIndices.size == 1 && isConcreteIndex(indexName, clusterState)) {
246-
concreteIndexName = indexName
247-
} else if (isAlias(indexName, clusterState) || isDataStream(indexName, clusterState)) {
248-
concreteIndexName = getWriteIndex(indexName, clusterState)
249-
?: getNewestIndexByCreationDate(concreteIndices, clusterState) //
250-
} else {
251-
concreteIndexName = getNewestIndexByCreationDate(concreteIndices, clusterState)
252-
}
253-
254-
return concreteIndexName
255-
}
256237
}
257238
}

src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt

+166
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.rollup.interceptor
77

88
import org.apache.http.entity.ContentType
99
import org.apache.http.entity.StringEntity
10+
import org.junit.Assert
1011
import org.opensearch.client.ResponseException
1112
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
1213
import org.opensearch.indexmanagement.common.model.dimension.Terms
@@ -1710,4 +1711,169 @@ class RollupInterceptorIT : RollupRestTestCase() {
17101711
assertTrue("The query_string query field check failed!", e.message!!.contains("Could not find a rollup job that can answer this query because [missing field unknown_field]"))
17111712
}
17121713
}
1714+
1715+
fun `test roll up search query_string query with index pattern as source`() {
1716+
val sourceIndex = "source_111_rollup_search_qsq_98243"
1717+
val targetIndex = "target_rollup_qsq_search_98243"
1718+
1719+
createSampleIndexForQSQTest(sourceIndex)
1720+
1721+
val rollup = Rollup(
1722+
id = "basic_query_string_query_rollup_search98243",
1723+
enabled = true,
1724+
schemaVersion = 1L,
1725+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
1726+
jobLastUpdatedTime = Instant.now(),
1727+
jobEnabledTime = Instant.now(),
1728+
description = "basic search test",
1729+
sourceIndex = "source_111*",
1730+
targetIndex = targetIndex,
1731+
metadataID = null,
1732+
roles = emptyList(),
1733+
pageSize = 10,
1734+
delay = 0,
1735+
continuous = false,
1736+
dimensions = listOf(
1737+
DateHistogram(sourceField = "event_ts", fixedInterval = "1h"),
1738+
Terms("state", "state"),
1739+
Terms("state_ext", "state_ext"),
1740+
Terms("state_ext2", "state_ext2"),
1741+
Terms("state_ordinal", "state_ordinal"),
1742+
Terms("abc test", "abc test"),
1743+
),
1744+
metrics = listOf(
1745+
RollupMetrics(
1746+
sourceField = "earnings", targetField = "earnings",
1747+
metrics = listOf(
1748+
Sum(), Min(), Max(),
1749+
ValueCount(), Average()
1750+
)
1751+
)
1752+
)
1753+
).let { createRollup(it, it.id) }
1754+
1755+
updateRollupStartTime(rollup)
1756+
1757+
waitFor {
1758+
val rollupJob = getRollup(rollupId = rollup.id)
1759+
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
1760+
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
1761+
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
1762+
}
1763+
1764+
refreshAllIndices()
1765+
1766+
// Term query
1767+
var req = """
1768+
{
1769+
"size": 0,
1770+
"query": {
1771+
"query_string": {
1772+
"query": "state:TX AND state_ext:CA AND 0",
1773+
"default_field": "state_ordinal"
1774+
}
1775+
1776+
},
1777+
"aggs": {
1778+
"earnings_total": {
1779+
"sum": {
1780+
"field": "earnings"
1781+
}
1782+
}
1783+
}
1784+
}
1785+
""".trimIndent()
1786+
var rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1787+
assertTrue(rawRes.restStatus() == RestStatus.OK)
1788+
var rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1789+
assertTrue(rollupRes.restStatus() == RestStatus.OK)
1790+
var rawAggRes = rawRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
1791+
var rollupAggRes = rollupRes.asMap()["aggregations"] as Map<String, Map<String, Any>>
1792+
assertEquals(
1793+
"Source and rollup index did not return same min results",
1794+
rawAggRes.getValue("earnings_total")["value"],
1795+
rollupAggRes.getValue("earnings_total")["value"]
1796+
)
1797+
}
1798+
1799+
fun `test roll up search query_string query with index pattern as source deleted`() {
1800+
val sourceIndex = "source_999_rollup_search_qsq_982439"
1801+
val targetIndex = "target_rollup_qsq_search_982439"
1802+
1803+
createSampleIndexForQSQTest(sourceIndex)
1804+
1805+
val rollup = Rollup(
1806+
id = "basic_query_string_query_rollup_search982499",
1807+
enabled = true,
1808+
schemaVersion = 1L,
1809+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
1810+
jobLastUpdatedTime = Instant.now(),
1811+
jobEnabledTime = Instant.now(),
1812+
description = "basic search test",
1813+
sourceIndex = "source_999*",
1814+
targetIndex = targetIndex,
1815+
metadataID = null,
1816+
roles = emptyList(),
1817+
pageSize = 10,
1818+
delay = 0,
1819+
continuous = false,
1820+
dimensions = listOf(
1821+
DateHistogram(sourceField = "event_ts", fixedInterval = "1h"),
1822+
Terms("state", "state"),
1823+
Terms("state_ext", "state_ext"),
1824+
Terms("state_ext2", "state_ext2"),
1825+
Terms("state_ordinal", "state_ordinal"),
1826+
Terms("abc test", "abc test"),
1827+
),
1828+
metrics = listOf(
1829+
RollupMetrics(
1830+
sourceField = "earnings", targetField = "earnings",
1831+
metrics = listOf(
1832+
Sum(), Min(), Max(),
1833+
ValueCount(), Average()
1834+
)
1835+
)
1836+
)
1837+
).let { createRollup(it, it.id) }
1838+
1839+
updateRollupStartTime(rollup)
1840+
1841+
waitFor {
1842+
val rollupJob = getRollup(rollupId = rollup.id)
1843+
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
1844+
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
1845+
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
1846+
}
1847+
1848+
refreshAllIndices()
1849+
1850+
deleteIndex(sourceIndex)
1851+
1852+
// Term query
1853+
var req = """
1854+
{
1855+
"size": 0,
1856+
"query": {
1857+
"query_string": {
1858+
"query": "state:TX AND state_ext:CA AND 0",
1859+
"default_field": "state_ordinal"
1860+
}
1861+
1862+
},
1863+
"aggs": {
1864+
"earnings_total": {
1865+
"sum": {
1866+
"field": "earnings"
1867+
}
1868+
}
1869+
}
1870+
}
1871+
""".trimIndent()
1872+
try {
1873+
client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
1874+
fail("Failure was expected when searching rollup index using qsq query when sourceIndex does not exist!")
1875+
} catch (e: ResponseException) {
1876+
Assert.assertTrue(e.message!!.contains("Can't parse query_string query without sourceIndex mappings!"))
1877+
}
1878+
}
17131879
}

0 commit comments

Comments
 (0)