@@ -40,49 +40,89 @@ import org.opensearch.indexmanagement.waitFor
4040import java.time.Instant
4141
4242abstract class RollupRestTestCase : IndexManagementRestTestCase () {
43- companion object {
44- @AfterClass
45- @JvmStatic fun clearIndicesAfterClass () {
46- wipeAllIndices()
47- }
48- }
49-
5043 @After
5144 @Suppress(" UNCHECKED_CAST" )
52- fun KillAllCancallableRunningTasks () {
45+ fun killAllCancellableRunningTasks () {
5346 client().makeRequest(" POST" , " _tasks/_cancel?actions=*" )
5447 waitFor {
5548 val response = client().makeRequest(" GET" , " _tasks" )
5649 val nodes = response.asMap()[" nodes" ] as Map <String , Any ?>
57- var hasCancallableRunningTasks = false
50+ var hasCancellableRunningTasks = false
5851 nodes.forEach {
5952 val tasks = (it.value as Map <String , Any ?>)[" tasks" ] as Map <String , Any ?>
6053 tasks.forEach { e ->
6154 if ((e.value as Map <String , Any ?>)[" cancellable" ] as Boolean ) {
62- hasCancallableRunningTasks = true
55+ hasCancellableRunningTasks = true
6356 }
6457 }
6558 }
66- assertFalse(hasCancallableRunningTasks )
59+ assertFalse(hasCancellableRunningTasks )
6760 }
6861 }
6962
7063 @Suppress(" UNCHECKED_CAST" )
71- fun waitForCancallableTasksToFinish () {
72- waitFor {
73- val response = client().makeRequest(" GET" , " _tasks" )
74- val nodes = response.asMap()[" nodes" ] as Map <String , Any ?>
75- var hasCancallableRunningTasks = false
76- nodes.forEach {
77- val tasks = (it.value as Map <String , Any ?>)[" tasks" ] as Map <String , Any ?>
78- tasks.forEach { e ->
79- if ((e.value as Map <String , Any ?>)[" cancellable" ] as Boolean ) {
80- hasCancallableRunningTasks = true
81- logger.info(" cancellable task running: ${e.key} " )
64+ protected fun stopAllRollupJobs (): List <String > {
65+ val stoppedJobIds = mutableListOf<String >()
66+ try {
67+ val response = client().makeRequest(" GET" , " $ROLLUP_JOBS_BASE_URI ?size=1000" )
68+ val rollupsList = response.asMap()[" rollups" ] as ? List <Map <String , Any ?>> ? : emptyList()
69+
70+ rollupsList.forEach { rollupMap ->
71+ val rollupObj = rollupMap[" rollup" ] as ? Map <String , Any ?> ? : return @forEach
72+ val id = rollupMap[" _id" ] as ? String ? : return @forEach
73+ val enabled = rollupObj[" enabled" ] as ? Boolean ? : false
74+
75+ if (enabled) {
76+ try {
77+ client().makeRequest(" POST" , " $ROLLUP_JOBS_BASE_URI /$id /_stop" )
78+ stoppedJobIds.add(id)
79+ logger.debug(" Stopped rollup job during test cleanup: $id " )
80+ } catch (e: Exception ) {
81+ logger.debug(" Failed to stop rollup $id during cleanup: ${e.message} " )
8282 }
8383 }
8484 }
85- assertFalse(hasCancallableRunningTasks)
85+ } catch (e: Exception ) {
86+ logger.warn(" Error stopping rollup jobs during test cleanup" , e)
87+ }
88+ return stoppedJobIds
89+ }
90+
91+ /* *
92+ * Waits for all specified rollup jobs to be disabled.
93+ *
94+ * Once a job is disabled (enabled = false), it won't start new executions.
95+ * Any in-flight execution will complete its current iteration, but since we're
96+ * wiping all indices anyway, we just need to ensure no new executions will start.
97+ */
98+ protected fun waitForRollupJobsToStop (jobIds : List <String >) {
99+ if (jobIds.isEmpty()) return
100+
101+ waitFor {
102+ val allDisabled = jobIds.all { jobId -> isRollupJobDisabled(jobId) }
103+ assertTrue(" Rollup jobs were not disabled within timeout" , allDisabled)
104+ }
105+ }
106+
107+ private fun isRollupJobDisabled (jobId : String ): Boolean =
108+ try {
109+ val rollup = getRollup(jobId)
110+ if (rollup.enabled) {
111+ logger.debug(" Waiting for rollup job $jobId to be disabled" )
112+ false
113+ } else {
114+ true
115+ }
116+ } catch (e: Exception ) {
117+ // Job might have been deleted, consider it disabled
118+ logger.debug(" Job $jobId not found: ${e.message} " )
119+ true
120+ }
121+
122+ companion object {
123+ @AfterClass
124+ @JvmStatic fun clearIndicesAfterClass () {
125+ wipeAllIndices()
86126 }
87127 }
88128
0 commit comments