Skip to content

Commit 80b470d

Browse files
opensearch-trigger-bot[bot]petardz
authored andcommitted
* initial commit Signed-off-by: Petar Dzepina <[email protected]> * initial commit Signed-off-by: Petar Dzepina <[email protected]> * reverted pkgbuild.gradle Signed-off-by: Petar Dzepina <[email protected]> * ec Signed-off-by: Petar Dzepina <[email protected]> * rework Signed-off-by: Petar Dzepina <[email protected]> * ec Signed-off-by: Petar Dzepina <[email protected]> * ec Signed-off-by: Petar Dzepina <[email protected]> * ec Signed-off-by: Petar Dzepina <[email protected]> * fixed tests Signed-off-by: Petar Dzepina <[email protected]> * changed rollup id in test Signed-off-by: Petar Dzepina <[email protected]> --------- Signed-off-by: Petar Dzepina <[email protected]> (cherry picked from commit 7ed6299) Co-authored-by: Petar Dzepina <[email protected]> Signed-off-by: Ronnak Saxena <[email protected]>
1 parent 7939003 commit 80b470d

File tree

4 files changed

+80
-9
lines changed

4 files changed

+80
-9
lines changed

src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataService.kt

+10-6
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ import org.opensearch.action.search.SearchRequest
1919
import org.opensearch.action.search.SearchResponse
2020
import org.opensearch.client.Client
2121
import org.opensearch.common.Rounding
22+
import org.opensearch.common.time.DateFormatter
23+
import org.opensearch.common.time.DateFormatters
2224
import org.opensearch.common.unit.TimeValue
2325
import org.opensearch.common.xcontent.LoggingDeprecationHandler
24-
import org.opensearch.core.xcontent.NamedXContentRegistry
2526
import org.opensearch.common.xcontent.XContentFactory
2627
import org.opensearch.common.xcontent.XContentHelper
2728
import org.opensearch.common.xcontent.XContentType
29+
import org.opensearch.core.xcontent.NamedXContentRegistry
2830
import org.opensearch.index.query.MatchAllQueryBuilder
2931
import org.opensearch.indexmanagement.IndexManagementPlugin
3032
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
@@ -34,7 +36,7 @@ import org.opensearch.indexmanagement.rollup.model.ContinuousMetadata
3436
import org.opensearch.indexmanagement.rollup.model.Rollup
3537
import org.opensearch.indexmanagement.rollup.model.RollupMetadata
3638
import org.opensearch.indexmanagement.rollup.model.RollupStats
37-
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_EPOCH_MILLIS_FORMAT
39+
import org.opensearch.indexmanagement.rollup.util.DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT
3840
import org.opensearch.indexmanagement.util.NO_ID
3941
import org.opensearch.search.aggregations.bucket.composite.InternalComposite
4042
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder
@@ -181,7 +183,7 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
181183
.sort(dateHistogram.sourceField, SortOrder.ASC) // TODO: figure out where nulls are sorted
182184
.trackTotalHits(false)
183185
.fetchSource(false)
184-
.docValueField(dateHistogram.sourceField, DATE_FIELD_EPOCH_MILLIS_FORMAT)
186+
.docValueField(dateHistogram.sourceField, DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
185187
val searchRequest = SearchRequest(rollup.sourceIndex)
186188
.source(searchSourceBuilder)
187189
.allowPartialSearchResults(false)
@@ -194,10 +196,12 @@ class RollupMetadataService(val client: Client, val xContentRegistry: NamedXCont
194196

195197
// Get the doc value field of the dateHistogram.sourceField for the first search hit converted to epoch millis
196198
// If the doc value is null or empty it will be treated the same as empty doc hits
197-
val firstHitTimestamp = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()?.toLong()
199+
val firstHitTimestampAsString: String? = response.hits.hits.first().field(dateHistogram.sourceField).getValue<String>()
198200
?: return StartingTimeResult.NoDocumentsFound
199-
200-
return StartingTimeResult.Success(getRoundedTime(firstHitTimestamp, dateHistogram))
201+
// Parse date and extract epochMillis
202+
val formatter = DateFormatter.forPattern(DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT)
203+
val epochMillis = DateFormatters.from(formatter.parse(firstHitTimestampAsString), formatter.locale()).toInstant().toEpochMilli()
204+
return StartingTimeResult.Success(getRoundedTime(epochMillis, dateHistogram))
201205
} catch (e: RemoteTransportException) {
202206
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
203207
logger.debug("Error when getting initial start time for rollup [${rollup.id}]: $unwrappedException")

src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ import org.opensearch.action.search.SearchRequest
1212
import org.opensearch.cluster.ClusterState
1313
import org.opensearch.cluster.metadata.IndexMetadata
1414
import org.opensearch.common.xcontent.LoggingDeprecationHandler
15-
import org.opensearch.core.xcontent.NamedXContentRegistry
1615
import org.opensearch.common.xcontent.XContentHelper
17-
import org.opensearch.core.xcontent.XContentParser.Token
1816
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
1917
import org.opensearch.common.xcontent.XContentType
18+
import org.opensearch.core.xcontent.NamedXContentRegistry
19+
import org.opensearch.core.xcontent.XContentParser.Token
2020
import org.opensearch.index.query.BoolQueryBuilder
2121
import org.opensearch.index.query.BoostingQueryBuilder
2222
import org.opensearch.index.query.ConstantScoreQueryBuilder
@@ -63,6 +63,7 @@ import org.opensearch.search.aggregations.metrics.SumAggregationBuilder
6363
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder
6464
import org.opensearch.search.builder.SearchSourceBuilder
6565

66+
const val DATE_FIELD_STRICT_DATE_OPTIONAL_TIME_FORMAT = "strict_date_optional_time"
6667
const val DATE_FIELD_EPOCH_MILLIS_FORMAT = "epoch_millis"
6768

6869
@Suppress("ReturnCount")

src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupMetadataServiceTests.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,7 @@ class RollupMetadataServiceTests : OpenSearchTestCase() {
735735

736736
// TODO: Mockito 2 supposedly should be able to mock final classes but there were errors when trying to do so
737737
// Will need to check if there is a workaround or a better way to mock getting hits.hits since this current approach is verbose
738-
val docField = DocumentField(dateHistogram.sourceField, listOf(getInstant(timestamp).toEpochMilli().toString()))
738+
val docField = DocumentField(dateHistogram.sourceField, listOf(timestamp))
739739
val searchHit = SearchHit(0)
740740
searchHit.setDocumentField(dateHistogram.sourceField, docField)
741741
val searchHits = SearchHits(arrayOf(searchHit), null, 0.0F)

src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt

+66
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.opensearch.rest.RestRequest
3333
import org.opensearch.rest.RestStatus
3434
import java.time.Instant
3535
import java.time.temporal.ChronoUnit
36+
import java.util.Collections.emptyMap
3637

3738
class RollupRunnerIT : RollupRestTestCase() {
3839

@@ -1253,6 +1254,71 @@ class RollupRunnerIT : RollupRestTestCase() {
12531254
assertEquals("Backing index [$backingIndex2] has to have owner rollup job with id:[${startedRollup1.id}]", rollupMetadata.failureReason)
12541255
}
12551256

1257+
fun `test rollup with date_nanos as date_histogram field`() {
1258+
val index = "date-nanos-index"
1259+
val rollupIndex = "date-nanos-index-rollup"
1260+
createIndex(
1261+
index,
1262+
Settings.EMPTY,
1263+
""""properties": {
1264+
"purchaseDate": {
1265+
"type": "date_nanos"
1266+
},
1267+
"itemName": {
1268+
"type": "keyword"
1269+
},
1270+
"itemPrice": {
1271+
"type": "float"
1272+
}
1273+
}"""
1274+
)
1275+
1276+
indexDoc(index, "1", """{"purchaseDate": 1683149130000.6497, "itemName": "shoes", "itemPrice": 100.5}""".trimIndent())
1277+
indexDoc(index, "2", """{"purchaseDate": 1683494790000, "itemName": "shoes", "itemPrice": 30.0}""".trimIndent())
1278+
indexDoc(index, "3", """{"purchaseDate": "2023-05-08T18:57:33.743656789Z", "itemName": "shoes", "itemPrice": 60.592}""".trimIndent())
1279+
1280+
refreshAllIndices()
1281+
1282+
val job = Rollup(
1283+
id = "rollup_with_alias_992434131",
1284+
schemaVersion = 1L,
1285+
enabled = true,
1286+
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.DAYS),
1287+
jobLastUpdatedTime = Instant.now(),
1288+
jobEnabledTime = Instant.now(),
1289+
description = "basic change of page size",
1290+
sourceIndex = index,
1291+
targetIndex = rollupIndex,
1292+
metadataID = null,
1293+
roles = emptyList(),
1294+
pageSize = 1000,
1295+
delay = 0,
1296+
continuous = true,
1297+
dimensions = listOf(
1298+
DateHistogram(sourceField = "purchaseDate", fixedInterval = "5d"),
1299+
Terms("itemName", "itemName"),
1300+
),
1301+
metrics = listOf(
1302+
RollupMetrics(
1303+
sourceField = "itemPrice",
1304+
targetField = "itemPrice",
1305+
metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average())
1306+
)
1307+
)
1308+
).let { createRollup(it, it.id) }
1309+
1310+
updateRollupStartTime(job)
1311+
1312+
waitFor { assertTrue("Target rollup index was not created", indexExists(rollupIndex)) }
1313+
1314+
waitFor {
1315+
val rollupJob = getRollup(rollupId = job.id)
1316+
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
1317+
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
1318+
assertEquals("Rollup is not started", RollupMetadata.Status.STARTED, rollupMetadata.status)
1319+
}
1320+
}
1321+
12561322
// TODO: Test scenarios:
12571323
// - Source index deleted after first execution
12581324
// * If this is with a source index pattern and the underlying indices are recreated but with different data

0 commit comments

Comments
 (0)