diff --git a/CHANGELOG.md b/CHANGELOG.md index fc7bdcbbbd..ee3da9c977 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +- Add `--shard-split-dynamic` for work-stealing parallel test execution: flows are distributed across N devices via a shared queue, so faster devices pick up new flows immediately instead of sitting idle +- Add `--min-healthy-devices` to set a minimum threshold of healthy devices; aborts the run if too many devices crash to prevent a single surviving device from running the full queue +- Automatically stop the app after each flow to free device memory, preventing slowdown in long test suites + ## 2.6.0 - Add Maestro Viewer - a desktop visualizer for live hierarchy and commands diff --git a/e2e/cli/test-cli.sh b/e2e/cli/test-cli.sh index d3a01141ae..8e65d580e8 100755 --- a/e2e/cli/test-cli.sh +++ b/e2e/cli/test-cli.sh @@ -48,6 +48,12 @@ check "maestro test subcommand gives usage instructions when called with --help" check "maestro bugreport gives instruction" \ "maestro bugreport" includes "https://github.com/mobile-dev-inc/Maestro/issues" +check "maestro test --help includes --shard-split-dynamic flag" \ + "maestro test --help" includes "--shard-split-dynamic" + +check "maestro test --help includes --min-healthy-devices flag" \ + "maestro test --help" includes "--min-healthy-devices" + echo "" echo "$PASS passed, $FAIL failed" [ $FAIL -eq 0 ] diff --git a/install-dynamic.sh b/install-dynamic.sh new file mode 100644 index 0000000000..55456bf23e --- /dev/null +++ b/install-dynamic.sh @@ -0,0 +1,34 @@ +#!/bin/sh +# Installs the Maestro fork with --shard-split-dynamic support. +# Drop-in replacement for the official https://get.maestro.mobile.dev installer. +# +# Usage: +# curl -Ls https://raw.githubusercontent.com/thamys-moraes/Maestro/feature/dynamic-sharding/install-dynamic.sh | bash + +set -e + +RELEASE_URL="https://github.com/thamys-moraes/Maestro/releases/download/v2.6.0-dynamic/maestro-dynamic-macos.zip" +INSTALL_DIR="$HOME/.maestro" + +echo "Installing Maestro fork (v2.6.0-dynamic)..." + +# Remove previous installation and ensure target directory exists +rm -rf "$INSTALL_DIR/bin" "$INSTALL_DIR/lib" +mkdir -p "$INSTALL_DIR" + +# Download and extract +TMP=$(mktemp -d) +curl -Ls "$RELEASE_URL" -o "$TMP/maestro.zip" +unzip -q "$TMP/maestro.zip" -d "$TMP" +cp -r "$TMP/maestro/bin" "$INSTALL_DIR/bin" +cp -r "$TMP/maestro/lib" "$INSTALL_DIR/lib" +rm -rf "$TMP" + +# Ensure PATH +if ! echo "$PATH" | grep -q "$INSTALL_DIR/bin"; then + echo "export PATH=\"\$HOME/.maestro/bin:\$PATH\"" >> "$HOME/.zshrc" 2>/dev/null || true + echo "export PATH=\"\$HOME/.maestro/bin:\$PATH\"" >> "$HOME/.bash_profile" 2>/dev/null || true +fi + +export PATH="$INSTALL_DIR/bin:$PATH" +echo "Installed: $(maestro --version)" diff --git a/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt b/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt index 09f2f91e2a..7afeb6ac49 100644 --- a/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt +++ b/maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt @@ -42,6 +42,7 @@ import maestro.cli.model.TestExecutionSummary import maestro.cli.report.ReportFormat import maestro.cli.report.ReporterFactory import maestro.cli.report.TestDebugReporter +import maestro.cli.runner.DynamicShardScheduler import maestro.cli.runner.TestRunner import maestro.cli.runner.TestSuiteInteractor import maestro.cli.runner.resultview.AnsiResultView @@ -121,6 +122,22 @@ class TestCommand : Callable { ) private var shardAll: Int? = null + @Option( + names = ["--shard-split-dynamic"], + description = ["Distribute flows dynamically across N devices using a shared queue (work-stealing). " + + "Mutually exclusive with --shard-split and --shard-all."], + ) + private var shardSplitDynamic: Int? = null + + @Option( + names = ["--min-healthy-devices"], + description = ["Minimum number of healthy devices required for --shard-split-dynamic to continue. " + + "If alive devices drop below this threshold the run is cancelled. (default: \${DEFAULT-VALUE})"], + defaultValue = "2", + ) + private var minHealthyDevices: Int = 2 + + @Option(names = ["-c", "--continuous"]) private var continuous: Boolean = false @@ -256,6 +273,9 @@ class TestCommand : Callable { if (shardSplit != null && shardAll != null) { throw CliError("Options --shard-split and --shard-all are mutually exclusive.") } + if (shardSplitDynamic != null && (shardSplit != null || shardAll != null)) { + throw CliError("Option --shard-split-dynamic is mutually exclusive with --shard-split and --shard-all.") + } @Suppress("DEPRECATION") if (legacyShardCount != null) { @@ -356,7 +376,7 @@ class TestCommand : Callable { } private fun handleSessions(debugOutputPath: Path, plan: ExecutionPlan, testOutputDir: Path?): Int = runBlocking(Dispatchers.IO) { - val requestedShards = shardSplit ?: shardAll ?: 1 + val requestedShards = shardSplit ?: shardAll ?: shardSplitDynamic ?: 1 if (requestedShards > 1 && plan.sequence.flows.isNotEmpty()) { error("Cannot run sharded tests with sequential execution") } @@ -412,6 +432,47 @@ class TestCommand : Callable { "Will use $effectiveShards shards instead." if (shardAll == null && requestedShards > plan.flowsToRun.size) PrintUtils.warn(warning) + // === Dynamic work-stealing path === + if (shardSplitDynamic != null) { + val workers = shardSplitDynamic!!.coerceAtMost(deviceIds.size) + val flowCount = plan.flowsToRun.size + PrintUtils.info( + "Will dynamically distribute $flowCount flows across $workers devices " + + "(shared queue, min-healthy: $minHealthyDevices)" + ) + + if (flowCount > 5) showCloudFasterResultsPromotionMessageIfNeeded() + + val summaries = DynamicShardScheduler( + plan = plan, + deviceIds = deviceIds.take(workers), + minHealthyDevices = minHealthyDevices, + env = env, + debugOutputPath = debugOutputPath, + testOutputDir = testOutputDir, + host = parent?.host, + port = parent?.port, + teamId = appleTeamId, + platform = platform ?: parent?.platform, + isHeadless = headless, + screenSize = screenSize, + reinstallDriver = reinstallDriver, + reporter = ReporterFactory.buildReporter(format, testSuiteName), + captureSteps = format == ReportFormat.HTML_DETAILED, + ).run() + + val passed = summaries.sumOf { it.passedCount ?: 0 } + val total = summaries.sumOf { it.totalTests ?: 0 } + + if (passed != total) showCloudDebugPromotionMessageIfNeeded() + summaries.mergeSummaries()?.saveReport() + if (summaries.size > 1) printShardsMessage(passed, total, summaries) + if (analyze) TestAnalysisManager(apiUrl = apiUrl, apiKey = apiKey).runAnalysis(debugOutputPath) + + return@runBlocking if (passed == total) 0 else 1 + } + + // === Static (existing) path === val chunkPlans = makeChunkPlans(plan, effectiveShards, onlySequenceFlows) val flowCount = if (onlySequenceFlows) plan.sequence.flows.size else plan.flowsToRun.size diff --git a/maestro-cli/src/main/java/maestro/cli/runner/DynamicShardScheduler.kt b/maestro-cli/src/main/java/maestro/cli/runner/DynamicShardScheduler.kt new file mode 100644 index 0000000000..cab1330b84 --- /dev/null +++ b/maestro-cli/src/main/java/maestro/cli/runner/DynamicShardScheduler.kt @@ -0,0 +1,142 @@ +package maestro.cli.runner + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.runBlocking +import maestro.cli.CliError +import maestro.cli.model.TestExecutionSummary +import maestro.cli.report.ReportFormat +import maestro.cli.report.TestSuiteReporter +import maestro.cli.session.MaestroSessionManager +import maestro.orchestra.workspace.WorkspaceExecutionPlanner.ExecutionPlan +import org.slf4j.LoggerFactory +import java.net.ServerSocket +import java.nio.file.Path +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference + +/** + * Distributes flows across devices using a shared queue (work-stealing). + * + * Each device runs as an independent worker: it opens a single Maestro session and consumes + * flows from the shared queue one at a time until the queue is empty. Devices that finish + * early pick up remaining flows rather than sitting idle. + * + * Fail-fast: when the number of alive workers drops below [minHealthyDevices], the run is + * aborted with a [CliError] to prevent a single surviving device from running all remaining flows. + * + * Re-enqueue: if a worker's session crashes mid-flow, the in-progress flow is returned to the + * queue and another worker will pick it up (as long as enough healthy devices remain). + */ +class DynamicShardScheduler( + private val plan: ExecutionPlan, + private val deviceIds: List, + private val minHealthyDevices: Int, + private val env: Map, + private val debugOutputPath: Path, + private val testOutputDir: Path?, + private val host: String?, + private val port: Int?, + private val teamId: String?, + private val platform: String?, + private val isHeadless: Boolean, + private val screenSize: String?, + private val reinstallDriver: Boolean, + private val reporter: TestSuiteReporter, + private val captureSteps: Boolean, +) { + + private val logger = LoggerFactory.getLogger(DynamicShardScheduler::class.java) + + suspend fun run(): List = coroutineScope { + val flowQueue = Channel(Channel.UNLIMITED) + val pending = AtomicInteger(plan.flowsToRun.size) + val aliveWorkers = AtomicInteger(deviceIds.size) + val cancelled = AtomicBoolean(false) + val cancellationReason = AtomicReference() + val summaries = Collections.synchronizedList(mutableListOf()) + + plan.flowsToRun.forEach { flowQueue.trySend(it) } + + val jobs = deviceIds.mapIndexed { workerIndex, deviceId -> + async(Dispatchers.IO + CoroutineName("worker-$workerIndex")) { + if (cancelled.get()) return@async + + val driverHostPort = selectPort() + try { + MaestroSessionManager.newSession( + host = host, + port = port, + teamId = teamId, + driverHostPort = driverHostPort, + deviceId = deviceId, + platform = platform, + isHeadless = isHeadless, + screenSize = screenSize, + reinstallDriver = reinstallDriver, + executionPlan = plan, + ) { session -> + val interactor = TestSuiteInteractor( + maestro = session.maestro, + device = session.device, + shardIndex = workerIndex, + reporter = reporter, + captureSteps = captureSteps, + ) + val summary = runBlocking { + interactor.runFromQueue( + flowQueue = flowQueue, + pending = pending, + cancelled = cancelled, + onDeviceCrash = { crashedFlow -> + // Return the flow to the queue for another worker to pick up. + flowQueue.trySend(crashedFlow) + decrementAndCheckAlive(workerIndex, deviceId, aliveWorkers, cancelled, cancellationReason) + }, + env = env, + debugOutputPath = debugOutputPath, + testOutputDir = testOutputDir, + deviceId = deviceId, + ) + } + summaries.add(summary) + } + } catch (e: Exception) { + // Session failed to open (device unreachable at startup). + logger.error("[worker-$workerIndex] Session for device $deviceId failed: ${e.message}") + decrementAndCheckAlive(workerIndex, deviceId, aliveWorkers, cancelled, cancellationReason) + } + } + } + + jobs.awaitAll() + + cancellationReason.get()?.let { throw CliError(it) } + + summaries + } + + private fun decrementAndCheckAlive( + workerIndex: Int, + deviceId: String, + aliveWorkers: AtomicInteger, + cancelled: AtomicBoolean, + cancellationReason: AtomicReference, + ) { + val alive = aliveWorkers.decrementAndGet() + logger.warn("[worker-$workerIndex] Device $deviceId is no longer healthy. Alive workers: $alive (minimum: $minHealthyDevices)") + if (alive < minHealthyDevices && !cancelled.getAndSet(true)) { + cancellationReason.set( + "Aborting dynamic run: only $alive healthy device(s) remaining, minimum is $minHealthyDevices" + ) + } + } + + private fun selectPort(): Int = ServerSocket(0).use { it.localPort } +} diff --git a/maestro-cli/src/main/java/maestro/cli/runner/TestSuiteInteractor.kt b/maestro-cli/src/main/java/maestro/cli/runner/TestSuiteInteractor.kt index c8426fe631..693bc40706 100644 --- a/maestro-cli/src/main/java/maestro/cli/runner/TestSuiteInteractor.kt +++ b/maestro-cli/src/main/java/maestro/cli/runner/TestSuiteInteractor.kt @@ -1,5 +1,6 @@ package maestro.cli.runner +import kotlinx.coroutines.channels.Channel import maestro.Maestro import maestro.MaestroException import maestro.cli.CliError @@ -27,6 +28,8 @@ import okio.Sink import org.slf4j.LoggerFactory import java.io.File import java.nio.file.Path +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger import kotlin.system.measureTimeMillis import kotlin.time.Duration.Companion.seconds import maestro.cli.util.ScreenshotUtils @@ -107,6 +110,74 @@ class TestSuiteInteractor( } + val summary = buildSummary(flowResults, aiOutputs, passed, debugOutputPath) + + if (reportOut != null) { + reporter.report(summary, reportOut) + } + + return summary + } + + /** + * Consumes flows from [flowQueue] one at a time until [pending] reaches zero or [cancelled] is set. + * + * Used by [maestro.cli.runner.DynamicShardScheduler] for work-stealing execution. Each call to + * [runFlow] is a complete flow execution; if [runFlow] throws (session-level crash), [onDeviceCrash] + * is invoked to re-enqueue the flow before this worker exits. + */ + suspend fun runFromQueue( + flowQueue: Channel, + pending: AtomicInteger, + cancelled: AtomicBoolean, + onDeviceCrash: (Path) -> Unit, + env: Map, + debugOutputPath: Path, + testOutputDir: Path?, + deviceId: String?, + ): TestExecutionSummary { + val flowResults = mutableListOf() + val aiOutputs = mutableListOf() + var passed = true + + while (pending.get() > 0 && !cancelled.get()) { + val flowPath = flowQueue.tryReceive().getOrNull() ?: run { + // Queue temporarily empty: a re-enqueue may be in-flight; spin briefly. + kotlinx.coroutines.delay(50) + continue + } + + var completed = false + try { + val flowFile = flowPath.toFile() + val updatedEnv = env + .withInjectedShellEnvVars() + .withDefaultEnvVars(flowFile, deviceId, shardIndex) + + val (result, aiOutput) = runFlow(flowFile, updatedEnv, maestro, debugOutputPath, testOutputDir) + flowResults.add(result) + aiOutputs.add(aiOutput) + if (result.status == FlowStatus.ERROR) passed = false + completed = true + } catch (e: Exception) { + // Session-level crash (device disconnected mid-flow): re-enqueue and stop this worker. + logger.error("${shardPrefix}Session crashed on flow ${flowPath.fileName}: ${e.message}") + onDeviceCrash(flowPath) + return buildSummary(flowResults, aiOutputs, passed = false, debugOutputPath = debugOutputPath) + } finally { + if (completed) pending.decrementAndGet() + } + } + + return buildSummary(flowResults, aiOutputs, passed, debugOutputPath) + } + + private fun buildSummary( + flowResults: List, + aiOutputs: List, + passed: Boolean, + debugOutputPath: Path, + ): TestExecutionSummary { val suiteDuration = flowResults.sumOf { it.duration?.inWholeSeconds ?: 0 }.seconds TestSuiteStatusView.showSuiteResult( @@ -114,19 +185,20 @@ class TestSuiteInteractor( status = if (passed) FlowStatus.SUCCESS else FlowStatus.ERROR, duration = suiteDuration, shardIndex = shardIndex, - flows = flowResults - .map { - TestSuiteViewModel.FlowResult( - name = it.name, - status = it.status, - duration = it.duration, - ) - }, + flows = flowResults.map { + TestSuiteViewModel.FlowResult( + name = it.name, + status = it.status, + duration = it.duration, + ) + }, ), uploadUrl = "" ) - val summary = TestExecutionSummary( + TestDebugReporter.saveSuggestions(aiOutputs, debugOutputPath) + + return TestExecutionSummary( passed = passed, suites = listOf( TestExecutionSummary.SuiteResult( @@ -137,20 +209,8 @@ class TestSuiteInteractor( ) ), passedCount = flowResults.count { it.status == FlowStatus.SUCCESS }, - totalTests = flowResults.size + totalTests = flowResults.size, ) - - if (reportOut != null) { - reporter.report( - summary, - reportOut, - ) - } - - // TODO(bartekpacia): Should it also be saving to debugOutputPath? - TestDebugReporter.saveSuggestions(aiOutputs, debugOutputPath) - - return summary } private suspend fun runFlow( @@ -176,6 +236,7 @@ class TestSuiteInteractor( .readCommands(flowFile.toPath()) .withEnv(env) + val launchedAppId = commands.firstNotNullOfOrNull { it.launchAppCommand?.appId } val maestroConfig = YamlCommandReader.getConfig(commands) val flowName: String = maestroConfig?.name ?: flowFile.nameWithoutExtension @@ -250,6 +311,13 @@ class TestSuiteInteractor( logger.error("${shardPrefix}Failed to complete flow", e) flowStatus = FlowStatus.ERROR errorMessage = ErrorViewUtils.exceptionToMessage(e) + } finally { + // Stop the app to free device memory before the next flow runs. + // App ID is resolved from the flow's own launchApp command — no extra config needed. + launchedAppId?.let { appId -> + runCatching { maestro.stopApp(appId) } + .onFailure { e -> logger.debug("${shardPrefix}stopApp($appId) skipped: ${e.message}") } + } } } val flowDuration = TimeUtils.durationInSeconds(flowTimeMillis) diff --git a/maestro-cli/src/test/kotlin/maestro/cli/command/TestCommandTest.kt b/maestro-cli/src/test/kotlin/maestro/cli/command/TestCommandTest.kt index 17342b7dd9..da17a1ba07 100644 --- a/maestro-cli/src/test/kotlin/maestro/cli/command/TestCommandTest.kt +++ b/maestro-cli/src/test/kotlin/maestro/cli/command/TestCommandTest.kt @@ -1,11 +1,14 @@ package maestro.cli.command import com.google.common.truth.Truth.assertThat +import maestro.cli.CliError import maestro.orchestra.workspace.WorkspaceExecutionPlanner import maestro.orchestra.WorkspaceConfig import org.junit.jupiter.api.Test import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.assertThrows import java.nio.file.Path +import java.lang.reflect.Field class TestCommandTest { @@ -174,6 +177,38 @@ class TestCommandTest { assertThat(result).isFalse() } + /***************************************** + ***** --shard-split-dynamic flag tests *** + ******************************************/ + + @Test + fun `call should throw CliError when shard-split-dynamic and shard-split are both set`() { + setPrivateField(testCommand, "shardSplitDynamic", 3) + setPrivateField(testCommand, "shardSplit", 3) + setPrivateField(testCommand, "flowFiles", setOf(java.io.File("."))) + + assertThrows { testCommand.call() } + } + + @Test + fun `call should throw CliError when shard-split-dynamic and shard-all are both set`() { + setPrivateField(testCommand, "shardSplitDynamic", 3) + setPrivateField(testCommand, "shardAll", 3) + setPrivateField(testCommand, "flowFiles", setOf(java.io.File("."))) + + assertThrows { testCommand.call() } + } + + @Test + fun `shardSplitDynamic and shardSplit are mutually exclusive regardless of order`() { + setPrivateField(testCommand, "shardSplit", 2) + setPrivateField(testCommand, "shardSplitDynamic", 2) + setPrivateField(testCommand, "flowFiles", setOf(java.io.File("."))) + + val thrown = assertThrows { testCommand.call() } + assertThat(thrown.message).contains("mutually exclusive") + } + /***************************************** ************ Common Functions ************ ******************************************/ @@ -182,4 +217,13 @@ class TestCommandTest { requireNotNull(resourceUrl) { "Test resource not found: $resourcePath" } return Path.of(resourceUrl.toURI()) } + + private fun setPrivateField(target: Any, fieldName: String, value: Any?) { + val field: Field = target.javaClass.declaredFields + .firstOrNull { it.name == fieldName } + ?: target.javaClass.superclass?.declaredFields?.firstOrNull { it.name == fieldName } + ?: error("Field $fieldName not found") + field.isAccessible = true + field.set(target, value) + } }