Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

- Add `--shard-split-dynamic` for work-stealing parallel test execution: flows are distributed across N devices via a shared queue, so faster devices pick up new flows immediately instead of sitting idle
- Add `--min-healthy-devices` to set a minimum threshold of healthy devices; aborts the run if too many devices crash to prevent a single surviving device from running the full queue
- Automatically stop the app after each flow to free device memory, preventing slowdown in long test suites

## 2.6.0

- Add Maestro Viewer - a desktop visualizer for live hierarchy and commands
Expand Down
6 changes: 6 additions & 0 deletions e2e/cli/test-cli.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ check "maestro test subcommand gives usage instructions when called with --help"
check "maestro bugreport gives instruction" \
"maestro bugreport" includes "https://github.com/mobile-dev-inc/Maestro/issues"

check "maestro test --help includes --shard-split-dynamic flag" \
"maestro test --help" includes "--shard-split-dynamic"

check "maestro test --help includes --min-healthy-devices flag" \
"maestro test --help" includes "--min-healthy-devices"

echo ""
echo "$PASS passed, $FAIL failed"
[ $FAIL -eq 0 ]
34 changes: 34 additions & 0 deletions install-dynamic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/bin/sh
# Installs the Maestro fork with --shard-split-dynamic support.
# Drop-in replacement for the official https://get.maestro.mobile.dev installer.
#
# Usage:
# curl -Ls https://raw.githubusercontent.com/thamys-moraes/Maestro/feature/dynamic-sharding/install-dynamic.sh | bash

set -e

RELEASE_URL="https://github.com/thamys-moraes/Maestro/releases/download/v2.6.0-dynamic/maestro-dynamic-macos.zip"
INSTALL_DIR="$HOME/.maestro"

echo "Installing Maestro fork (v2.6.0-dynamic)..."

# Remove previous installation and ensure target directory exists
rm -rf "$INSTALL_DIR/bin" "$INSTALL_DIR/lib"
mkdir -p "$INSTALL_DIR"

# Download and extract
TMP=$(mktemp -d)
curl -Ls "$RELEASE_URL" -o "$TMP/maestro.zip"
unzip -q "$TMP/maestro.zip" -d "$TMP"
cp -r "$TMP/maestro/bin" "$INSTALL_DIR/bin"
cp -r "$TMP/maestro/lib" "$INSTALL_DIR/lib"
rm -rf "$TMP"

# Ensure PATH
if ! echo "$PATH" | grep -q "$INSTALL_DIR/bin"; then
echo "export PATH=\"\$HOME/.maestro/bin:\$PATH\"" >> "$HOME/.zshrc" 2>/dev/null || true
echo "export PATH=\"\$HOME/.maestro/bin:\$PATH\"" >> "$HOME/.bash_profile" 2>/dev/null || true
fi

export PATH="$INSTALL_DIR/bin:$PATH"
echo "Installed: $(maestro --version)"
63 changes: 62 additions & 1 deletion maestro-cli/src/main/java/maestro/cli/command/TestCommand.kt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import maestro.cli.model.TestExecutionSummary
import maestro.cli.report.ReportFormat
import maestro.cli.report.ReporterFactory
import maestro.cli.report.TestDebugReporter
import maestro.cli.runner.DynamicShardScheduler
import maestro.cli.runner.TestRunner
import maestro.cli.runner.TestSuiteInteractor
import maestro.cli.runner.resultview.AnsiResultView
Expand Down Expand Up @@ -121,6 +122,22 @@ class TestCommand : Callable<Int> {
)
private var shardAll: Int? = null

@Option(
names = ["--shard-split-dynamic"],
description = ["Distribute flows dynamically across N devices using a shared queue (work-stealing). " +
"Mutually exclusive with --shard-split and --shard-all."],
)
private var shardSplitDynamic: Int? = null

@Option(
names = ["--min-healthy-devices"],
description = ["Minimum number of healthy devices required for --shard-split-dynamic to continue. " +
"If alive devices drop below this threshold the run is cancelled. (default: \${DEFAULT-VALUE})"],
defaultValue = "2",
)
private var minHealthyDevices: Int = 2


@Option(names = ["-c", "--continuous"])
private var continuous: Boolean = false

Expand Down Expand Up @@ -256,6 +273,9 @@ class TestCommand : Callable<Int> {
if (shardSplit != null && shardAll != null) {
throw CliError("Options --shard-split and --shard-all are mutually exclusive.")
}
if (shardSplitDynamic != null && (shardSplit != null || shardAll != null)) {
throw CliError("Option --shard-split-dynamic is mutually exclusive with --shard-split and --shard-all.")
}

@Suppress("DEPRECATION")
if (legacyShardCount != null) {
Expand Down Expand Up @@ -356,7 +376,7 @@ class TestCommand : Callable<Int> {
}

private fun handleSessions(debugOutputPath: Path, plan: ExecutionPlan, testOutputDir: Path?): Int = runBlocking(Dispatchers.IO) {
val requestedShards = shardSplit ?: shardAll ?: 1
val requestedShards = shardSplit ?: shardAll ?: shardSplitDynamic ?: 1
if (requestedShards > 1 && plan.sequence.flows.isNotEmpty()) {
error("Cannot run sharded tests with sequential execution")
}
Expand Down Expand Up @@ -412,6 +432,47 @@ class TestCommand : Callable<Int> {
"Will use $effectiveShards shards instead."
if (shardAll == null && requestedShards > plan.flowsToRun.size) PrintUtils.warn(warning)

// === Dynamic work-stealing path ===
if (shardSplitDynamic != null) {
val workers = shardSplitDynamic!!.coerceAtMost(deviceIds.size)
val flowCount = plan.flowsToRun.size
PrintUtils.info(
"Will dynamically distribute $flowCount flows across $workers devices " +
"(shared queue, min-healthy: $minHealthyDevices)"
)

if (flowCount > 5) showCloudFasterResultsPromotionMessageIfNeeded()

val summaries = DynamicShardScheduler(
plan = plan,
deviceIds = deviceIds.take(workers),
minHealthyDevices = minHealthyDevices,
env = env,
debugOutputPath = debugOutputPath,
testOutputDir = testOutputDir,
host = parent?.host,
port = parent?.port,
teamId = appleTeamId,
platform = platform ?: parent?.platform,
isHeadless = headless,
screenSize = screenSize,
reinstallDriver = reinstallDriver,
reporter = ReporterFactory.buildReporter(format, testSuiteName),
captureSteps = format == ReportFormat.HTML_DETAILED,
).run()

val passed = summaries.sumOf { it.passedCount ?: 0 }
val total = summaries.sumOf { it.totalTests ?: 0 }

if (passed != total) showCloudDebugPromotionMessageIfNeeded()
summaries.mergeSummaries()?.saveReport()
if (summaries.size > 1) printShardsMessage(passed, total, summaries)
if (analyze) TestAnalysisManager(apiUrl = apiUrl, apiKey = apiKey).runAnalysis(debugOutputPath)

return@runBlocking if (passed == total) 0 else 1
}

// === Static (existing) path ===
val chunkPlans = makeChunkPlans(plan, effectiveShards, onlySequenceFlows)

val flowCount = if (onlySequenceFlows) plan.sequence.flows.size else plan.flowsToRun.size
Expand Down
142 changes: 142 additions & 0 deletions maestro-cli/src/main/java/maestro/cli/runner/DynamicShardScheduler.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package maestro.cli.runner

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import maestro.cli.CliError
import maestro.cli.model.TestExecutionSummary
import maestro.cli.report.ReportFormat
import maestro.cli.report.TestSuiteReporter
import maestro.cli.session.MaestroSessionManager
import maestro.orchestra.workspace.WorkspaceExecutionPlanner.ExecutionPlan
import org.slf4j.LoggerFactory
import java.net.ServerSocket
import java.nio.file.Path
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

/**
* Distributes flows across devices using a shared queue (work-stealing).
*
* Each device runs as an independent worker: it opens a single Maestro session and consumes
* flows from the shared queue one at a time until the queue is empty. Devices that finish
* early pick up remaining flows rather than sitting idle.
*
* Fail-fast: when the number of alive workers drops below [minHealthyDevices], the run is
* aborted with a [CliError] to prevent a single surviving device from running all remaining flows.
*
* Re-enqueue: if a worker's session crashes mid-flow, the in-progress flow is returned to the
* queue and another worker will pick it up (as long as enough healthy devices remain).
*/
class DynamicShardScheduler(
private val plan: ExecutionPlan,
private val deviceIds: List<String>,
private val minHealthyDevices: Int,
private val env: Map<String, String>,
private val debugOutputPath: Path,
private val testOutputDir: Path?,
private val host: String?,
private val port: Int?,
private val teamId: String?,
private val platform: String?,
private val isHeadless: Boolean,
private val screenSize: String?,
private val reinstallDriver: Boolean,
private val reporter: TestSuiteReporter,
private val captureSteps: Boolean,
) {

private val logger = LoggerFactory.getLogger(DynamicShardScheduler::class.java)

suspend fun run(): List<TestExecutionSummary> = coroutineScope {
val flowQueue = Channel<Path>(Channel.UNLIMITED)
val pending = AtomicInteger(plan.flowsToRun.size)
val aliveWorkers = AtomicInteger(deviceIds.size)
val cancelled = AtomicBoolean(false)
val cancellationReason = AtomicReference<String?>()
val summaries = Collections.synchronizedList(mutableListOf<TestExecutionSummary>())

plan.flowsToRun.forEach { flowQueue.trySend(it) }

val jobs = deviceIds.mapIndexed { workerIndex, deviceId ->
async(Dispatchers.IO + CoroutineName("worker-$workerIndex")) {
if (cancelled.get()) return@async

val driverHostPort = selectPort()
try {
MaestroSessionManager.newSession(
host = host,
port = port,
teamId = teamId,
driverHostPort = driverHostPort,
deviceId = deviceId,
platform = platform,
isHeadless = isHeadless,
screenSize = screenSize,
reinstallDriver = reinstallDriver,
executionPlan = plan,
) { session ->
val interactor = TestSuiteInteractor(
maestro = session.maestro,
device = session.device,
shardIndex = workerIndex,
reporter = reporter,
captureSteps = captureSteps,
)
val summary = runBlocking {
interactor.runFromQueue(
flowQueue = flowQueue,
pending = pending,
cancelled = cancelled,
onDeviceCrash = { crashedFlow ->
// Return the flow to the queue for another worker to pick up.
flowQueue.trySend(crashedFlow)
decrementAndCheckAlive(workerIndex, deviceId, aliveWorkers, cancelled, cancellationReason)
},
env = env,
debugOutputPath = debugOutputPath,
testOutputDir = testOutputDir,
deviceId = deviceId,
)
}
summaries.add(summary)
}
} catch (e: Exception) {
// Session failed to open (device unreachable at startup).
logger.error("[worker-$workerIndex] Session for device $deviceId failed: ${e.message}")
decrementAndCheckAlive(workerIndex, deviceId, aliveWorkers, cancelled, cancellationReason)
}
}
}

jobs.awaitAll()

cancellationReason.get()?.let { throw CliError(it) }

summaries
}

private fun decrementAndCheckAlive(
workerIndex: Int,
deviceId: String,
aliveWorkers: AtomicInteger,
cancelled: AtomicBoolean,
cancellationReason: AtomicReference<String?>,
) {
val alive = aliveWorkers.decrementAndGet()
logger.warn("[worker-$workerIndex] Device $deviceId is no longer healthy. Alive workers: $alive (minimum: $minHealthyDevices)")
if (alive < minHealthyDevices && !cancelled.getAndSet(true)) {
cancellationReason.set(
"Aborting dynamic run: only $alive healthy device(s) remaining, minimum is $minHealthyDevices"
)
}
}

private fun selectPort(): Int = ServerSocket(0).use { it.localPort }
}
Loading