Skip to content

Commit 65acca1

Browse files
authored
doc-level monitor fan-out approach (opensearch-project#1496)
Signed-off-by: Subhobrata Dey <[email protected]>
1 parent 71d0364 commit 65acca1

34 files changed

+2497
-913
lines changed

alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt

+19-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.alerting
77

88
import org.opensearch.action.ActionRequest
9+
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
910
import org.opensearch.alerting.action.ExecuteMonitorAction
1011
import org.opensearch.alerting.action.ExecuteWorkflowAction
1112
import org.opensearch.alerting.action.GetDestinationsAction
@@ -55,6 +56,7 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
5556
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
5657
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
5758
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
59+
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
5860
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
5961
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
6062
import org.opensearch.alerting.transport.TransportGetAlertsAction
@@ -220,6 +222,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
220222
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
221223
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
222224
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
225+
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
223226
)
224227
}
225228

@@ -254,6 +257,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
254257
val settings = environment.settings()
255258
val lockService = LockService(client, clusterService)
256259
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
260+
val alertService = AlertService(client, xContentRegistry, alertIndices)
261+
val triggerService = TriggerService(scriptService)
257262
runner = MonitorRunnerService
258263
.registerClusterService(clusterService)
259264
.registerClient(client)
@@ -264,8 +269,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
264269
.registerThreadPool(threadPool)
265270
.registerAlertIndices(alertIndices)
266271
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings))
267-
.registerTriggerService(TriggerService(scriptService))
268-
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
272+
.registerTriggerService(triggerService)
273+
.registerAlertService(alertService)
269274
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
270275
.registerJvmStats(JvmStats.jvmStats())
271276
.registerWorkflowService(WorkflowService(client, xContentRegistry))
@@ -296,7 +301,17 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
296301

297302
DeleteMonitorService.initialize(client, lockService)
298303

299-
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService)
304+
return listOf(
305+
sweeper,
306+
scheduler,
307+
runner,
308+
scheduledJobIndices,
309+
docLevelMonitorQueries,
310+
destinationMigrationCoordinator,
311+
lockService,
312+
alertService,
313+
triggerService
314+
)
300315
}
301316

302317
override fun getSettings(): List<Setting<*>> {
@@ -327,6 +342,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
327342
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
328343
AlertingSettings.ALERTING_MAX_MONITORS,
329344
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
345+
AlertingSettings.DOC_LEVEL_MONITOR_FAN_OUT_NODES,
330346
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
331347
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
332348
AlertingSettings.REQUEST_TIMEOUT,

alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ import org.opensearch.search.aggregations.AggregatorFactories
5555
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
5656
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
5757
import org.opensearch.search.builder.SearchSourceBuilder
58+
import org.opensearch.transport.TransportService
5859
import java.time.Instant
5960
import java.util.UUID
6061

@@ -68,7 +69,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
6869
periodEnd: Instant,
6970
dryrun: Boolean,
7071
workflowRunContext: WorkflowRunContext?,
71-
executionId: String
72+
executionId: String,
73+
transportService: TransportService
7274
): MonitorRunResult<BucketLevelTriggerRunResult> {
7375
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
7476
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")

0 commit comments

Comments
 (0)