diff --git a/maestro-client/src/main/java/maestro/drivers/AndroidDriver.kt b/maestro-client/src/main/java/maestro/drivers/AndroidDriver.kt index ea95d99768..871feeb11b 100644 --- a/maestro-client/src/main/java/maestro/drivers/AndroidDriver.kt +++ b/maestro-client/src/main/java/maestro/drivers/AndroidDriver.kt @@ -25,6 +25,7 @@ import dadb.AdbShellPacket import dadb.AdbShellResponse import dadb.AdbShellStream import dadb.Dadb +import io.grpc.ManagedChannel import io.grpc.okhttp.OkHttpChannelBuilder import io.grpc.Metadata import io.grpc.Status @@ -40,7 +41,7 @@ import maestro.android.chromedevtools.AndroidWebViewHierarchyClient import maestro.device.DeviceOrientation import maestro.device.Platform import maestro.utils.BlockingStreamObserver -import maestro.utils.MaestroTimer +import maestro.utils.GrpcRetry import maestro.utils.Metrics import maestro.utils.MetricsProvider import maestro.utils.ScreenshotUtils @@ -76,16 +77,32 @@ class AndroidDriver( private val metrics = metricsProvider.withPrefix("maestro.driver").withTags(mapOf("platform" to "android", "emulatorName" to emulatorName)) - private val channel = OkHttpChannelBuilder.forAddress("localhost", this.hostPort) + // Channel and stubs are rebuilt on broken-pipe via [rebuildChannel] — see [runDeviceCall]. + @Volatile private var channel: ManagedChannel = buildChannel() + @Volatile private var blockingStub: MaestroDriverGrpc.MaestroDriverBlockingStub = MaestroDriverGrpc.newBlockingStub(channel) + @Volatile private var asyncStub: MaestroDriverGrpc.MaestroDriverStub = MaestroDriverGrpc.newStub(channel) + private val blockingStubWithTimeout get() = blockingStub.withDeadlineAfter(120, TimeUnit.SECONDS) + + private fun buildChannel(): ManagedChannel = OkHttpChannelBuilder.forAddress("localhost", this.hostPort) .usePlaintext() .socketFactory(AdbSocketFactory { _, port -> dadb.open("tcp:$port") }) .keepAliveTime(2, TimeUnit.MINUTES) .keepAliveTimeout(20, TimeUnit.SECONDS) .keepAliveWithoutCalls(true) .build() - private val blockingStub = MaestroDriverGrpc.newBlockingStub(channel) - private val blockingStubWithTimeout get() = blockingStub.withDeadlineAfter(120, TimeUnit.SECONDS) - private val asyncStub = MaestroDriverGrpc.newStub(channel) + + private fun rebuildChannel() { + val old = channel + val fresh = buildChannel() + channel = fresh + blockingStub = MaestroDriverGrpc.newBlockingStub(fresh) + asyncStub = MaestroDriverGrpc.newStub(fresh) + try { + old.shutdownNow() + } catch (e: Exception) { + LOGGER.warn("Failed to shut down old gRPC channel after rebuild", e) + } + } private val documentBuilderFactory = DocumentBuilderFactory.newInstance() private val androidWebViewHierarchyClient = AndroidWebViewHierarchyClient(dadb) @@ -323,7 +340,9 @@ class AndroidDriver( override fun contentDescriptor(excludeKeyboardElements: Boolean): TreeNode { return metrics.measured("operation", mapOf("command" to "contentDescriptor")) { - val response = callViewHierarchy() + val response = runDeviceCall("viewHierarchy") { + blockingStubWithTimeout.viewHierarchy(viewHierarchyRequest {}) + } val document = documentBuilderFactory .newDocumentBuilder() @@ -361,40 +380,6 @@ class AndroidDriver( ) } - private fun callViewHierarchy(attempt: Int = 1): MaestroAndroid.ViewHierarchyResponse { - return try { - blockingStubWithTimeout.viewHierarchy(viewHierarchyRequest {}) - } catch (throwable: StatusRuntimeException) { - val status = Status.fromThrowable(throwable) - when (status.code) { - Status.Code.DEADLINE_EXCEEDED -> { - LOGGER.error("Timeout while fetching view hierarchy") - throw throwable - } - Status.Code.UNAVAILABLE -> { - if (throwable.cause is IOException || throwable.message?.contains("io exception", ignoreCase = true) == true) { - LOGGER.error("Not able to reach the gRPC server while fetching view hierarchy") - } else { - LOGGER.error("Received UNAVAILABLE status with message: ${throwable.message}") - } - } - else -> { - LOGGER.error("Unexpected error: ${status.code} - ${throwable.message}") - } - } - - // There is a bug in Android UiAutomator that rarely throws an NPE while dumping a view hierarchy. - // Trying to recover once by giving it a bit of time to settle. - LOGGER.error("Failed to get view hierarchy: ${status.description}", throwable) - - if (attempt > 0) { - MaestroTimer.sleep(MaestroTimer.Reason.BUFFER, 1000L) - return callViewHierarchy(attempt - 1) - } - throw throwable - } - } - override fun scrollVertical() { metrics.measured("operation", mapOf("command" to "scrollVertical")) { swipe(SwipeDirection.UP, 400) @@ -1270,8 +1255,16 @@ class AndroidDriver( private fun runDeviceCall(callName: String, call: () -> T): T { return try { - call() + GrpcRetry.withRetry( + callName = callName, + onBrokenPipe = { + LOGGER.warn("[$callName] broken pipe to emulator — rebuilding gRPC channel before retry") + rebuildChannel() + }, + call = call, + ) } catch (throwable: StatusRuntimeException) { + // After retries are exhausted, preserve the existing boundary logging. val status = Status.fromThrowable(throwable) when (status.code) { Status.Code.DEADLINE_EXCEEDED -> { @@ -1281,11 +1274,10 @@ class AndroidDriver( Status.Code.UNAVAILABLE -> { if (throwable.cause is IOException || throwable.message?.contains("io exception", ignoreCase = true) == true) { LOGGER.error("Not able to reach the gRPC server while processing $callName command") - throw throwable } else { LOGGER.error("Received UNAVAILABLE status with message: ${throwable.message} while processing $callName command", throwable) - throw throwable } + throw throwable } Status.Code.INTERNAL -> { val trailers = Status.trailersFromThrowable(throwable) diff --git a/maestro-client/src/main/java/maestro/utils/GrpcRetry.kt b/maestro-client/src/main/java/maestro/utils/GrpcRetry.kt new file mode 100644 index 0000000000..926c55b3e2 --- /dev/null +++ b/maestro-client/src/main/java/maestro/utils/GrpcRetry.kt @@ -0,0 +1,43 @@ +package maestro.utils + +import io.grpc.Status +import io.grpc.StatusRuntimeException +import java.io.IOException + +object GrpcRetry { + fun withRetry( + callName: String, + maxAttempts: Int = 3, + totalBudgetMs: Long = 30_000L, + nowMs: () -> Long = System::currentTimeMillis, + sleepMs: (Long) -> Unit = Thread::sleep, + onBrokenPipe: () -> Unit = {}, + call: () -> T, + ): T { + val deadlineMs = nowMs() + totalBudgetMs + var lastError: StatusRuntimeException? = null + repeat(maxAttempts) { attempt -> + if (nowMs() >= deadlineMs) { + throw lastError ?: error("Budget exceeded before first attempt") + } + try { + return call() + } catch (e: StatusRuntimeException) { + lastError = e + val code = Status.fromThrowable(e).code + if (code != Status.Code.UNAVAILABLE) throw e + if (attempt == maxAttempts - 1) throw e + if (isBrokenPipe(e)) onBrokenPipe() + sleepMs(200L) + } + } + throw lastError ?: error("Unreachable: maxAttempts must be > 0") + } + + private fun isBrokenPipe(e: StatusRuntimeException): Boolean { + if (e.cause is IOException) return true + val message = e.message ?: return false + return message.contains("io exception", ignoreCase = true) || + message.contains("broken pipe", ignoreCase = true) + } +} diff --git a/maestro-client/src/test/java/maestro/utils/GrpcRetryTest.kt b/maestro-client/src/test/java/maestro/utils/GrpcRetryTest.kt new file mode 100644 index 0000000000..54bd37e9b1 --- /dev/null +++ b/maestro-client/src/test/java/maestro/utils/GrpcRetryTest.kt @@ -0,0 +1,228 @@ +package maestro.utils + +import com.google.common.truth.Truth.assertThat +import io.grpc.Status +import io.grpc.StatusRuntimeException +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.io.IOException +import java.util.concurrent.atomic.AtomicInteger + +/** + * Tests for [GrpcRetry.withRetry] — bounded retry-with-reconnect for gRPC calls. + * + * The helper is what AndroidDriver.runDeviceCall delegates to. It exists so the + * retry policy (max attempts, total budget, broken-pipe → rebuild channel) can + * be tested without standing up a real gRPC channel or a device. + */ +internal class GrpcRetryTest { + + @Test + internal fun `success on first attempt returns immediately without retry or reconnect`() { + val attempts = AtomicInteger(0) + val reconnects = AtomicInteger(0) + val sleeps = mutableListOf() + + val result = GrpcRetry.withRetry( + callName = "deviceInfo", + sleepMs = { sleeps.add(it) }, + onBrokenPipe = { reconnects.incrementAndGet() }, + ) { + attempts.incrementAndGet() + "ok" + } + + assertThat(result).isEqualTo("ok") + assertThat(attempts.get()).isEqualTo(1) + assertThat(reconnects.get()).isEqualTo(0) + assertThat(sleeps).isEmpty() + } + + @Test + internal fun `UNAVAILABLE then success — retries and returns the recovered value`() { + val attempts = AtomicInteger(0) + val sleeps = mutableListOf() + + val result = GrpcRetry.withRetry( + callName = "deviceInfo", + sleepMs = { sleeps.add(it) }, + ) { + val attempt = attempts.incrementAndGet() + if (attempt < 3) throw Status.UNAVAILABLE.asRuntimeException() + "ok" + } + + assertThat(result).isEqualTo("ok") + assertThat(attempts.get()).isEqualTo(3) + // Two retries → two sleeps between attempts 1→2 and 2→3. + assertThat(sleeps).hasSize(2) + } + + @Test + internal fun `UNAVAILABLE on every attempt — bubbles the last error after maxAttempts`() { + val attempts = AtomicInteger(0) + + val ex = assertThrows { + GrpcRetry.withRetry( + callName = "deviceInfo", + maxAttempts = 3, + sleepMs = { /* no-op */ }, + ) { + attempts.incrementAndGet() + throw Status.UNAVAILABLE.withDescription("attempt failure").asRuntimeException() + } + } + + assertThat(ex.status.code).isEqualTo(Status.Code.UNAVAILABLE) + assertThat(attempts.get()).isEqualTo(3) + } + + @Test + internal fun `INVALID_ARGUMENT is not retried — bubbles immediately`() { + val attempts = AtomicInteger(0) + + val ex = assertThrows { + GrpcRetry.withRetry(callName = "tap", sleepMs = { /* no-op */ }) { + attempts.incrementAndGet() + throw Status.INVALID_ARGUMENT.asRuntimeException() + } + } + + assertThat(ex.status.code).isEqualTo(Status.Code.INVALID_ARGUMENT) + assertThat(attempts.get()).isEqualTo(1) + } + + @Test + internal fun `DEADLINE_EXCEEDED is not retried — emulator is hung, retry would wait again`() { + val attempts = AtomicInteger(0) + + val ex = assertThrows { + GrpcRetry.withRetry(callName = "viewHierarchy", sleepMs = { /* no-op */ }) { + attempts.incrementAndGet() + throw Status.DEADLINE_EXCEEDED.asRuntimeException() + } + } + + assertThat(ex.status.code).isEqualTo(Status.Code.DEADLINE_EXCEEDED) + assertThat(attempts.get()).isEqualTo(1) + } + + @Test + internal fun `INTERNAL is not retried — surfaces server-side error to caller`() { + val attempts = AtomicInteger(0) + + val ex = assertThrows { + GrpcRetry.withRetry(callName = "tap", sleepMs = { /* no-op */ }) { + attempts.incrementAndGet() + throw Status.INTERNAL.asRuntimeException() + } + } + + assertThat(ex.status.code).isEqualTo(Status.Code.INTERNAL) + assertThat(attempts.get()).isEqualTo(1) + } + + @Test + internal fun `UNAVAILABLE with IOException cause — invokes onBrokenPipe before retrying`() { + val attempts = AtomicInteger(0) + val reconnectsBeforeAttempt = mutableListOf() + val reconnects = AtomicInteger(0) + + val result = GrpcRetry.withRetry( + callName = "viewHierarchy", + sleepMs = { /* no-op */ }, + onBrokenPipe = { + reconnectsBeforeAttempt.add(attempts.get()) + reconnects.incrementAndGet() + }, + ) { + val attempt = attempts.incrementAndGet() + if (attempt == 1) { + throw Status.UNAVAILABLE + .withCause(IOException("Broken pipe")) + .asRuntimeException() + } + "ok" + } + + assertThat(result).isEqualTo("ok") + assertThat(attempts.get()).isEqualTo(2) + assertThat(reconnects.get()).isEqualTo(1) + // Reconnect must happen between attempt 1 (which failed) and attempt 2 (which retries). + assertThat(reconnectsBeforeAttempt).containsExactly(1) + } + + @Test + internal fun `UNAVAILABLE with 'io exception' in message — also treated as broken pipe`() { + val attempts = AtomicInteger(0) + val reconnects = AtomicInteger(0) + + val result = GrpcRetry.withRetry( + callName = "viewHierarchy", + sleepMs = { /* no-op */ }, + onBrokenPipe = { reconnects.incrementAndGet() }, + ) { + val attempt = attempts.incrementAndGet() + if (attempt == 1) { + throw Status.UNAVAILABLE + .withDescription("io exception: connection closed") + .asRuntimeException() + } + "ok" + } + + assertThat(result).isEqualTo("ok") + assertThat(reconnects.get()).isEqualTo(1) + } + + @Test + internal fun `plain UNAVAILABLE without IOException — retries but does not reconnect channel`() { + val attempts = AtomicInteger(0) + val reconnects = AtomicInteger(0) + + val result = GrpcRetry.withRetry( + callName = "viewHierarchy", + sleepMs = { /* no-op */ }, + onBrokenPipe = { reconnects.incrementAndGet() }, + ) { + val attempt = attempts.incrementAndGet() + if (attempt == 1) { + throw Status.UNAVAILABLE.withDescription("server busy").asRuntimeException() + } + "ok" + } + + assertThat(result).isEqualTo("ok") + assertThat(attempts.get()).isEqualTo(2) + assertThat(reconnects.get()).isEqualTo(0) + } + + @Test + internal fun `total budget exceeded — stops retrying even if maxAttempts remain`() { + val attempts = AtomicInteger(0) + // Scripted clock readings, consumed in order: + // t=0 deadline init → deadline = 30s + // t=10s budget check before attempt 1 → 10 < 30, attempt 1 runs and fails + // t=25s budget check before attempt 2 → 25 < 30, attempt 2 runs and fails + // t=40s budget check before attempt 3 → 40 ≥ 30, stop and rethrow + // maxAttempts is 5 but the budget cuts us off at 2 attempts. + val timeline = ArrayDeque(mutableListOf(0L, 10_000L, 25_000L, 40_000L)) + val nowMs: () -> Long = { timeline.removeFirstOrNull() ?: 40_000L } + + val ex = assertThrows { + GrpcRetry.withRetry( + callName = "deviceInfo", + maxAttempts = 5, + totalBudgetMs = 30_000L, + nowMs = nowMs, + sleepMs = { /* no-op */ }, + ) { + attempts.incrementAndGet() + throw Status.UNAVAILABLE.asRuntimeException() + } + } + + assertThat(ex.status.code).isEqualTo(Status.Code.UNAVAILABLE) + assertThat(attempts.get()).isEqualTo(2) + } +}