Skip to content

Commit 5dc690c

Browse files
authored
Enhance per bucket, and per document monitor notification message ctx. (opensearch-project#1450)
* Adding dev logs. Signed-off-by: AWSHurneyt <[email protected]> * Added support for returning sample documents for bucket level monitors. Signed-off-by: AWSHurneyt <[email protected]> * Added support for printing query/rule info in notification messages. Signed-off-by: AWSHurneyt <[email protected]> * Extracted out helper function. Signed-off-by: AWSHurneyt <[email protected]> * Extracted out helper function. Signed-off-by: AWSHurneyt <[email protected]> * Added support for printing document data in notification messages for document level monitors. Signed-off-by: AWSHurneyt <[email protected]> * Refactored logic after making AlertContext a separate class from Alert instead of inheriting/extending it in common utils. Signed-off-by: AWSHurneyt <[email protected]> * Moved AlertContext data model from common utils to alerting plugin. Signed-off-by: AWSHurneyt <[email protected]> * Fixed ktlint errors. Signed-off-by: AWSHurneyt <[email protected]> * Added additional unit tests. Signed-off-by: AWSHurneyt <[email protected]> * Extracted sample doc aggs logic into helper function. Added support for sorting sample docs based on metric aggregations. Signed-off-by: AWSHurneyt <[email protected]> * Extracted get sample doc logic into helper function. Added sorting for sample docs. Signed-off-by: AWSHurneyt <[email protected]> * Removed dev code. Signed-off-by: AWSHurneyt <[email protected]> * Fixed ktlint errors. Signed-off-by: AWSHurneyt <[email protected]> * Added comments based on PR feedback. Signed-off-by: AWSHurneyt <[email protected]> * Added logic to make mGet calls in batches. Signed-off-by: AWSHurneyt <[email protected]> --------- Signed-off-by: AWSHurneyt <[email protected]>
1 parent ba84d04 commit 5dc690c

14 files changed

+1387
-60
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt

+119-6
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ import org.opensearch.action.search.SearchRequest
1313
import org.opensearch.action.search.SearchResponse
1414
import org.opensearch.action.support.WriteRequest
1515
import org.opensearch.alerting.model.ActionRunResult
16+
import org.opensearch.alerting.model.AlertContext
1617
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
1718
import org.opensearch.alerting.model.InputRunResults
1819
import org.opensearch.alerting.model.MonitorRunResult
1920
import org.opensearch.alerting.opensearchapi.InjectorContextElement
21+
import org.opensearch.alerting.opensearchapi.convertToMap
2022
import org.opensearch.alerting.opensearchapi.retry
2123
import org.opensearch.alerting.opensearchapi.suspendUntil
2224
import org.opensearch.alerting.opensearchapi.withClosableContext
@@ -25,7 +27,9 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction
2527
import org.opensearch.alerting.util.getActionExecutionPolicy
2628
import org.opensearch.alerting.util.getBucketKeysHash
2729
import org.opensearch.alerting.util.getCombinedTriggerRunResult
30+
import org.opensearch.alerting.util.printsSampleDocData
2831
import org.opensearch.alerting.workflow.WorkflowRunContext
32+
import org.opensearch.client.Client
2933
import org.opensearch.common.xcontent.LoggingDeprecationHandler
3034
import org.opensearch.common.xcontent.XContentType
3135
import org.opensearch.commons.alerting.model.Alert
@@ -220,6 +224,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
220224
?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap))
221225
}
222226

227+
// The alertSampleDocs map structure is Map<TriggerId, Map<BucketKeysHash, List<Alert>>>
228+
val alertSampleDocs = mutableMapOf<String, Map<String, List<Map<String, Any>>>>()
223229
for (trigger in monitor.triggers) {
224230
val alertsToUpdate = mutableSetOf<Alert>()
225231
val completedAlertsToUpdate = mutableSetOf<Alert>()
@@ -230,6 +236,32 @@ object BucketLevelMonitorRunner : MonitorRunner() {
230236
?: mutableListOf()
231237
// Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution
232238
nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts)
239+
240+
// Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data.
241+
val isTriggered = !nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty()
242+
if (isTriggered && printsSampleDocData(trigger)) {
243+
try {
244+
val searchRequest = monitorCtx.inputService!!.getSearchRequest(
245+
monitor = monitor.copy(triggers = listOf(trigger)),
246+
searchInput = monitor.inputs[0] as SearchInput,
247+
periodStart = periodStart,
248+
periodEnd = periodEnd,
249+
prevResult = monitorResult.inputResults,
250+
matchingDocIdsPerIndex = null,
251+
returnSampleDocs = true
252+
)
253+
val sampleDocumentsByBucket = getSampleDocs(
254+
client = monitorCtx.client!!,
255+
monitorId = monitor.id,
256+
triggerId = trigger.id,
257+
searchRequest = searchRequest
258+
)
259+
alertSampleDocs[trigger.id] = sampleDocumentsByBucket
260+
} catch (e: Exception) {
261+
logger.error("Error retrieving sample documents for trigger {} of monitor {}.", trigger.id, monitor.id, e)
262+
}
263+
}
264+
233265
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
234266
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()
235267

@@ -255,8 +287,11 @@ object BucketLevelMonitorRunner : MonitorRunner() {
255287
for (alertCategory in actionExecutionScope.actionableAlerts) {
256288
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
257289
for (alert in alertsToExecuteActionsFor) {
290+
val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert)
291+
else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)
292+
258293
val actionCtx = getActionContextForAlertCategory(
259-
alertCategory, alert, triggerCtx, monitorOrTriggerError
294+
alertCategory, alertContext, triggerCtx, monitorOrTriggerError
260295
)
261296
// AggregationResultBucket should not be null here
262297
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
@@ -287,7 +322,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
287322

288323
val actionCtx = triggerCtx.copy(
289324
dedupedAlerts = dedupedAlerts,
290-
newAlerts = newAlerts,
325+
newAlerts = newAlerts.map {
326+
getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
327+
},
291328
completedAlerts = completedAlerts,
292329
error = monitorResult.error ?: triggerResult.error
293330
)
@@ -480,17 +517,93 @@ object BucketLevelMonitorRunner : MonitorRunner() {
480517

481518
private fun getActionContextForAlertCategory(
482519
alertCategory: AlertCategory,
483-
alert: Alert,
520+
alertContext: AlertContext,
484521
ctx: BucketLevelTriggerExecutionContext,
485522
error: Exception?
486523
): BucketLevelTriggerExecutionContext {
487524
return when (alertCategory) {
488525
AlertCategory.DEDUPED ->
489-
ctx.copy(dedupedAlerts = listOf(alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
526+
ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
490527
AlertCategory.NEW ->
491-
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alert), completedAlerts = emptyList(), error = error)
528+
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error)
492529
AlertCategory.COMPLETED ->
493-
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alert), error = error)
530+
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error)
531+
}
532+
}
533+
534+
private fun getAlertContext(
535+
alert: Alert,
536+
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>
537+
): AlertContext {
538+
val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
539+
val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
540+
return if (!bucketKey.isNullOrEmpty() && !sampleDocs.isNullOrEmpty()) {
541+
AlertContext(alert = alert, sampleDocs = sampleDocs)
542+
} else {
543+
logger.error(
544+
"Failed to retrieve sample documents for alert {} from trigger {} of monitor {} during execution {}.",
545+
alert.id,
546+
alert.triggerId,
547+
alert.monitorId,
548+
alert.executionId
549+
)
550+
AlertContext(alert = alert, sampleDocs = listOf())
494551
}
495552
}
553+
554+
/**
555+
* Executes the monitor's query with the addition of 2 top_hits aggregations that are used to return the top 5,
556+
* and bottom 5 documents for each bucket.
557+
*
558+
* @return Map<BucketKeysHash, List<Alert>>
559+
*/
560+
@Suppress("UNCHECKED_CAST")
561+
private suspend fun getSampleDocs(
562+
client: Client,
563+
monitorId: String,
564+
triggerId: String,
565+
searchRequest: SearchRequest
566+
): Map<String, List<Map<String, Any>>> {
567+
val sampleDocumentsByBucket = mutableMapOf<String, List<Map<String, Any>>>()
568+
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
569+
val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf<String, Any>()) as Map<String, Any>
570+
val compositeAgg = aggs.getOrDefault("composite_agg", mapOf<String, Any>()) as Map<String, Any>
571+
val buckets = compositeAgg.getOrDefault("buckets", emptyList<Map<String, Any>>()) as List<Map<String, Any>>
572+
573+
buckets.forEach { bucket ->
574+
val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf<String, String>()) as Map<String, String>).values.toList())
575+
if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.")
576+
577+
val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf<String, Any>()) as Map<String, Any>)
578+
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
579+
val topHits = unwrappedTopHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>
580+
581+
val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf<String, Any>()) as Map<String, Any>)
582+
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
583+
val lowHits = unwrappedLowHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>
584+
585+
// Reversing the order of lowHits so allHits will be in descending order.
586+
val allHits = topHits + lowHits.reversed()
587+
588+
if (allHits.isEmpty()) {
589+
// We expect sample documents to be available for each bucket.
590+
logger.error("Sample documents not found for trigger {} of monitor {}.", triggerId, monitorId)
591+
}
592+
593+
// Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each.
594+
// The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data.
595+
val uniqueHitIds = mutableSetOf<String>()
596+
val dedupedHits = mutableListOf<Map<String, Any>>()
597+
allHits.forEach { hit ->
598+
val hitId = hit["_id"] as String
599+
if (!uniqueHitIds.contains(hitId)) {
600+
uniqueHitIds.add(hitId)
601+
dedupedHits.add(hit)
602+
}
603+
}
604+
sampleDocumentsByBucket[bucketKey] = dedupedHits
605+
}
606+
607+
return sampleDocumentsByBucket
608+
}
496609
}

alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt

+77-6
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,28 @@ import org.opensearch.action.admin.indices.refresh.RefreshAction
1313
import org.opensearch.action.admin.indices.refresh.RefreshRequest
1414
import org.opensearch.action.bulk.BulkRequest
1515
import org.opensearch.action.bulk.BulkResponse
16+
import org.opensearch.action.get.MultiGetItemResponse
17+
import org.opensearch.action.get.MultiGetRequest
1618
import org.opensearch.action.index.IndexRequest
1719
import org.opensearch.action.search.SearchAction
1820
import org.opensearch.action.search.SearchRequest
1921
import org.opensearch.action.search.SearchResponse
22+
import org.opensearch.alerting.model.AlertContext
2023
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
2124
import org.opensearch.alerting.model.IndexExecutionContext
2225
import org.opensearch.alerting.model.InputRunResults
2326
import org.opensearch.alerting.model.MonitorMetadata
2427
import org.opensearch.alerting.model.MonitorRunResult
2528
import org.opensearch.alerting.model.userErrorMessage
29+
import org.opensearch.alerting.opensearchapi.convertToMap
2630
import org.opensearch.alerting.opensearchapi.suspendUntil
2731
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
2832
import org.opensearch.alerting.util.AlertingException
2933
import org.opensearch.alerting.util.IndexUtils
3034
import org.opensearch.alerting.util.defaultToPerExecutionAction
3135
import org.opensearch.alerting.util.getActionExecutionPolicy
36+
import org.opensearch.alerting.util.parseSampleDocTags
37+
import org.opensearch.alerting.util.printsSampleDocData
3238
import org.opensearch.alerting.workflow.WorkflowRunContext
3339
import org.opensearch.client.node.NodeClient
3440
import org.opensearch.cluster.metadata.IndexMetadata
@@ -64,6 +70,7 @@ import org.opensearch.percolator.PercolateQueryBuilderExt
6470
import org.opensearch.search.SearchHit
6571
import org.opensearch.search.SearchHits
6672
import org.opensearch.search.builder.SearchSourceBuilder
73+
import org.opensearch.search.fetch.subphase.FetchSourceContext
6774
import org.opensearch.search.sort.SortOrder
6875
import java.io.IOException
6976
import java.time.Instant
@@ -83,6 +90,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
8390
* Docs are fetched from the source index per shard and transformed.*/
8491
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()
8592

93+
// Maps a finding ID to the related document.
94+
private val findingIdToDocSource = mutableMapOf<String, MultiGetItemResponse>()
95+
8696
override suspend fun runMonitor(
8797
monitor: Monitor,
8898
monitorCtx: MonitorRunnerExecutionContext,
@@ -95,6 +105,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
95105
logger.debug("Document-level-monitor is running ...")
96106
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
97107
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
108+
monitorCtx.findingsToTriggeredQueries = mutableMapOf()
98109

99110
try {
100111
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
@@ -455,7 +466,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
455466
error = monitorResult.error ?: triggerResult.error
456467
)
457468

469+
if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty())
470+
getDocSources(
471+
findingToDocPairs = findingToDocPairs,
472+
monitorCtx = monitorCtx,
473+
monitor = monitor
474+
)
475+
458476
val alerts = mutableListOf<Alert>()
477+
val alertContexts = mutableListOf<AlertContext>()
459478
triggerFindingDocPairs.forEach {
460479
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
461480
listOf(it.first),
@@ -466,6 +485,18 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
466485
workflorwRunContext = workflowRunContext
467486
)
468487
alerts.add(alert)
488+
489+
val docSource = findingIdToDocSource[alert.findingIds.first()]?.response?.convertToMap()
490+
491+
alertContexts.add(
492+
AlertContext(
493+
alert = alert,
494+
associatedQueries = alert.findingIds.flatMap { findingId ->
495+
monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList()
496+
},
497+
sampleDocs = listOfNotNull(docSource)
498+
)
499+
)
469500
}
470501

471502
val shouldDefaultToPerExecution = defaultToPerExecutionAction(
@@ -479,13 +510,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
479510
for (action in trigger.actions) {
480511
val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope
481512
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
482-
for (alert in alerts) {
483-
val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun)
484-
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
485-
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
513+
for (alertContext in alertContexts) {
514+
val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alertContext)), monitorCtx, monitor, dryrun)
515+
triggerResult.actionResultsMap.getOrPut(alertContext.alert.id) { mutableMapOf() }
516+
triggerResult.actionResultsMap[alertContext.alert.id]?.set(action.id, actionResults)
486517
}
487-
} else if (alerts.isNotEmpty()) {
488-
val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun)
518+
} else if (alertContexts.isNotEmpty()) {
519+
val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun)
489520
for (alert in alerts) {
490521
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
491522
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
@@ -532,6 +563,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
532563
val findingDocPairs = mutableListOf<Pair<String, String>>()
533564
val findings = mutableListOf<Finding>()
534565
val indexRequests = mutableListOf<IndexRequest>()
566+
val findingsToTriggeredQueries = mutableMapOf<String, List<DocLevelQuery>>()
535567

536568
docsToQueries.forEach {
537569
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
@@ -552,6 +584,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
552584
)
553585
findingDocPairs.add(Pair(finding.id, it.key))
554586
findings.add(finding)
587+
findingsToTriggeredQueries[finding.id] = triggeredQueries
555588

556589
val findingStr =
557590
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
@@ -578,6 +611,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
578611
// suppress exception
579612
logger.error("Optional finding callback failed", e)
580613
}
614+
615+
if (monitorCtx.findingsToTriggeredQueries == null) monitorCtx.findingsToTriggeredQueries = findingsToTriggeredQueries
616+
else monitorCtx.findingsToTriggeredQueries = monitorCtx.findingsToTriggeredQueries!! + findingsToTriggeredQueries
617+
581618
return findingDocPairs
582619
}
583620

@@ -1047,6 +1084,40 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
10471084
return numDocs >= maxNumDocsThreshold
10481085
}
10491086

1087+
/**
1088+
* Performs an mGet request to retrieve the documents associated with findings.
1089+
*
1090+
* When possible, this will only retrieve the document fields that are specifically
1091+
* referenced for printing in the mustache template.
1092+
*/
1093+
private suspend fun getDocSources(
1094+
findingToDocPairs: List<Pair<String, String>>,
1095+
monitorCtx: MonitorRunnerExecutionContext,
1096+
monitor: Monitor
1097+
) {
1098+
val docFieldTags = parseSampleDocTags(monitor.triggers)
1099+
val request = MultiGetRequest()
1100+
1101+
// Perform mGet request in batches.
1102+
findingToDocPairs.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
1103+
batch.forEach { (findingId, docIdAndIndex) ->
1104+
val docIdAndIndexSplit = docIdAndIndex.split("|")
1105+
val docId = docIdAndIndexSplit[0]
1106+
val concreteIndex = docIdAndIndexSplit[1]
1107+
if (findingId.isNotEmpty() && docId.isNotEmpty() && concreteIndex.isNotEmpty()) {
1108+
val docItem = MultiGetRequest.Item(concreteIndex, docId)
1109+
if (docFieldTags.isNotEmpty())
1110+
docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray()))
1111+
request.add(docItem)
1112+
}
1113+
val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) }
1114+
response.responses.forEach { item ->
1115+
findingIdToDocSource[findingId] = item
1116+
}
1117+
}
1118+
}
1119+
}
1120+
10501121
/**
10511122
* POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name
10521123
* and doc source. A list of these POJOs would be passed to percolate query execution logic.

0 commit comments

Comments
 (0)