@@ -17,8 +17,8 @@ import org.opensearch.action.index.IndexRequest
17
17
import org.opensearch.action.search.SearchAction
18
18
import org.opensearch.action.search.SearchRequest
19
19
import org.opensearch.action.search.SearchResponse
20
- import org.opensearch.alerting.model.DocumentExecutionContext
21
20
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
21
+ import org.opensearch.alerting.model.IndexExecutionContext
22
22
import org.opensearch.alerting.model.InputRunResults
23
23
import org.opensearch.alerting.model.MonitorMetadata
24
24
import org.opensearch.alerting.model.MonitorRunResult
@@ -30,7 +30,6 @@ import org.opensearch.alerting.util.IndexUtils
30
30
import org.opensearch.alerting.util.defaultToPerExecutionAction
31
31
import org.opensearch.alerting.util.getActionExecutionPolicy
32
32
import org.opensearch.alerting.workflow.WorkflowRunContext
33
- import org.opensearch.client.Client
34
33
import org.opensearch.client.node.NodeClient
35
34
import org.opensearch.cluster.metadata.IndexMetadata
36
35
import org.opensearch.cluster.routing.Preference
@@ -59,6 +58,8 @@ import org.opensearch.index.IndexNotFoundException
59
58
import org.opensearch.index.query.BoolQueryBuilder
60
59
import org.opensearch.index.query.Operator
61
60
import org.opensearch.index.query.QueryBuilders
61
+ import org.opensearch.index.seqno.SequenceNumbers
62
+ import org.opensearch.indices.IndexClosedException
62
63
import org.opensearch.percolator.PercolateQueryBuilderExt
63
64
import org.opensearch.search.SearchHit
64
65
import org.opensearch.search.SearchHits
@@ -207,7 +208,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
207
208
}
208
209
209
210
// Prepare updatedLastRunContext for each index
210
- val indexUpdatedRunContext = updateLastRunContext (
211
+ val indexUpdatedRunContext = initializeNewLastRunContext (
211
212
indexLastRunContext.toMutableMap(),
212
213
monitorCtx,
213
214
concreteIndexName,
@@ -255,25 +256,29 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
255
256
" ${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
256
257
)
257
258
}
258
-
259
- // Prepare DocumentExecutionContext for each index
260
- val docExecutionContext = DocumentExecutionContext (queries, indexLastRunContext, indexUpdatedRunContext)
261
-
262
- fetchShardDataAndMaybeExecutePercolateQueries(
263
- monitor,
264
- monitorCtx,
265
- docExecutionContext,
259
+ val indexExecutionContext = IndexExecutionContext (
260
+ queries,
261
+ indexLastRunContext,
262
+ indexUpdatedRunContext,
266
263
updatedIndexName,
267
264
concreteIndexName,
268
265
conflictingFields.toList(),
269
266
matchingDocIdsPerIndex?.get(concreteIndexName),
267
+ )
268
+
269
+ fetchShardDataAndMaybeExecutePercolateQueries(
270
+ monitor,
271
+ monitorCtx,
272
+ indexExecutionContext,
270
273
monitorMetadata,
271
274
inputRunResults,
272
275
docsToQueries,
273
276
updatedIndexNames,
274
277
concreteIndicesSeenSoFar,
275
278
ArrayList (fieldsToBeQueried)
276
- )
279
+ ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number
280
+ indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo
281
+ }
277
282
}
278
283
}
279
284
/* if all indices are covered still in-memory docs size limit is not breached we would need to submit
@@ -615,7 +620,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
615
620
)
616
621
}
617
622
618
- private suspend fun updateLastRunContext (
623
+ private fun initializeNewLastRunContext (
619
624
lastRunContext : Map <String , Any >,
620
625
monitorCtx : MonitorRunnerExecutionContext ,
621
626
index : String ,
@@ -624,8 +629,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
624
629
val updatedLastRunContext = lastRunContext.toMutableMap()
625
630
for (i: Int in 0 until count) {
626
631
val shard = i.toString()
627
- val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!! , index, shard)
628
- updatedLastRunContext[shard] = maxSeqNo.toString()
632
+ updatedLastRunContext[shard] = SequenceNumbers .UNASSIGNED_SEQ_NO .toString()
629
633
}
630
634
return updatedLastRunContext
631
635
}
@@ -657,33 +661,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
657
661
return indexCreationDate > lastExecutionTime.toEpochMilli()
658
662
}
659
663
660
- /* *
661
- * Get the current max seq number of the shard. We find it by searching the last document
662
- * in the primary shard.
663
- */
664
- private suspend fun getMaxSeqNo (client : Client , index : String , shard : String ): Long {
665
- val request: SearchRequest = SearchRequest ()
666
- .indices(index)
667
- .preference(" _shards:$shard " )
668
- .source(
669
- SearchSourceBuilder ()
670
- .version(true )
671
- .sort(" _seq_no" , SortOrder .DESC )
672
- .seqNoAndPrimaryTerm(true )
673
- .query(QueryBuilders .matchAllQuery())
674
- .size(1 )
675
- )
676
- val response: SearchResponse = client.suspendUntil { client.search(request, it) }
677
- if (response.status() != = RestStatus .OK ) {
678
- throw IOException (" Failed to get max seq no for shard: $shard " )
679
- }
680
- nonPercolateSearchesTimeTakenStat + = response.took.millis
681
- if (response.hits.hits.isEmpty())
682
- return - 1L
683
-
684
- return response.hits.hits[0 ].seqNo
685
- }
686
-
687
664
private fun getShardsCount (clusterService : ClusterService , index : String ): Int {
688
665
val allShards: List <ShardRouting > = clusterService!! .state().routingTable().allShards(index)
689
666
return allShards.filter { it.primary() }.size
@@ -697,51 +674,79 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
697
674
private suspend fun fetchShardDataAndMaybeExecutePercolateQueries (
698
675
monitor : Monitor ,
699
676
monitorCtx : MonitorRunnerExecutionContext ,
700
- docExecutionCtx : DocumentExecutionContext ,
701
- indexName : String ,
702
- concreteIndexName : String ,
703
- conflictingFields : List <String >,
704
- docIds : List <String >? = null,
677
+ indexExecutionCtx : IndexExecutionContext ,
705
678
monitorMetadata : MonitorMetadata ,
706
679
inputRunResults : MutableMap <String , MutableSet <String >>,
707
680
docsToQueries : MutableMap <String , MutableList <String >>,
708
681
monitorInputIndices : List <String >,
709
682
concreteIndices : List <String >,
710
683
fieldsToBeQueried : List <String >,
684
+ updateLastRunContext : (String , String ) -> Unit
711
685
) {
712
- val count: Int = docExecutionCtx .updatedLastRunContext[" shards_count" ] as Int
686
+ val count: Int = indexExecutionCtx .updatedLastRunContext[" shards_count" ] as Int
713
687
for (i: Int in 0 until count) {
714
688
val shard = i.toString()
715
689
try {
716
- val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
717
- val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
718
-
719
- val hits: SearchHits = searchShard(
720
- monitorCtx,
721
- concreteIndexName,
722
- shard,
723
- prevSeqNo,
724
- maxSeqNo,
725
- docIds,
726
- fieldsToBeQueried
727
- )
728
- val startTime = System .currentTimeMillis()
729
- transformedDocs.addAll(
730
- transformSearchHitsAndReconstructDocs(
731
- hits,
732
- indexName,
733
- concreteIndexName,
734
- monitor.id,
735
- conflictingFields,
690
+ val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull()
691
+ val from = prevSeqNo ? : SequenceNumbers .NO_OPS_PERFORMED
692
+ var to: Long = Long .MAX_VALUE
693
+ while (to >= from) {
694
+ val hits: SearchHits = searchShard(
695
+ monitorCtx,
696
+ indexExecutionCtx.concreteIndexName,
697
+ shard,
698
+ from,
699
+ to,
700
+ indexExecutionCtx.docIds,
701
+ fieldsToBeQueried,
736
702
)
737
- )
738
- docTransformTimeTakenStat + = System .currentTimeMillis() - startTime
703
+ if (hits.hits.isEmpty()) {
704
+ if (to == Long .MAX_VALUE ) {
705
+ updateLastRunContext(shard, (prevSeqNo ? : SequenceNumbers .NO_OPS_PERFORMED ).toString()) // didn't find any docs
706
+ }
707
+ break
708
+ }
709
+ if (to == Long .MAX_VALUE ) { // max sequence number of shard needs to be computed
710
+ updateLastRunContext(shard, hits.hits[0 ].seqNo.toString())
711
+ }
712
+ val leastSeqNoFromHits = hits.hits.last().seqNo
713
+ to = leastSeqNoFromHits - 1
714
+ val startTime = System .currentTimeMillis()
715
+ transformedDocs.addAll(
716
+ transformSearchHitsAndReconstructDocs(
717
+ hits,
718
+ indexExecutionCtx.indexName,
719
+ indexExecutionCtx.concreteIndexName,
720
+ monitor.id,
721
+ indexExecutionCtx.conflictingFields,
722
+ )
723
+ )
724
+ if (
725
+ transformedDocs.isNotEmpty() &&
726
+ shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx)
727
+ ) {
728
+ performPercolateQueryAndResetCounters(
729
+ monitorCtx,
730
+ monitor,
731
+ monitorMetadata,
732
+ monitorInputIndices,
733
+ concreteIndices,
734
+ inputRunResults,
735
+ docsToQueries,
736
+ )
737
+ }
738
+ docTransformTimeTakenStat + = System .currentTimeMillis() - startTime
739
+ }
739
740
} catch (e: Exception ) {
740
741
logger.error(
741
742
" Monitor ${monitor.id} :" +
742
- " Failed to run fetch data from shard [$shard ] of index [$concreteIndexName ]. Error: ${e.message} " ,
743
+ " Failed to run fetch data from shard [$shard ] of index [${indexExecutionCtx.concreteIndexName} ]. " +
744
+ " Error: ${e.message} " ,
743
745
e
744
746
)
747
+ if (e is IndexClosedException ) {
748
+ throw e
749
+ }
745
750
}
746
751
if (
747
752
transformedDocs.isNotEmpty() &&
@@ -833,8 +838,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
833
838
.source(
834
839
SearchSourceBuilder ()
835
840
.version(true )
841
+ .sort(" _seq_no" , SortOrder .DESC )
842
+ .seqNoAndPrimaryTerm(true )
836
843
.query(boolQueryBuilder)
837
- .size(10000 )
844
+ .size(monitorCtx.docLevelMonitorShardFetchSize )
838
845
)
839
846
.preference(Preference .PRIMARY_FIRST .type())
840
847
@@ -846,7 +853,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
846
853
}
847
854
val response: SearchResponse = monitorCtx.client!! .suspendUntil { monitorCtx.client!! .search(request, it) }
848
855
if (response.status() != = RestStatus .OK ) {
849
- logger.error(" Failed search shard. Response: $response " )
850
856
throw IOException (" Failed to search shard: [$shard ] in index [$index ]. Response status is ${response.status()} " )
851
857
}
852
858
nonPercolateSearchesTimeTakenStat + = response.took.millis
0 commit comments