Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 35 additions & 43 deletions maestro-client/src/main/java/maestro/drivers/AndroidDriver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import dadb.AdbShellPacket
import dadb.AdbShellResponse
import dadb.AdbShellStream
import dadb.Dadb
import io.grpc.ManagedChannel
import io.grpc.okhttp.OkHttpChannelBuilder
import io.grpc.Metadata
import io.grpc.Status
Expand All @@ -40,7 +41,7 @@ import maestro.android.chromedevtools.AndroidWebViewHierarchyClient
import maestro.device.DeviceOrientation
import maestro.device.Platform
import maestro.utils.BlockingStreamObserver
import maestro.utils.MaestroTimer
import maestro.utils.GrpcRetry
import maestro.utils.Metrics
import maestro.utils.MetricsProvider
import maestro.utils.ScreenshotUtils
Expand Down Expand Up @@ -76,16 +77,32 @@ class AndroidDriver(

private val metrics = metricsProvider.withPrefix("maestro.driver").withTags(mapOf("platform" to "android", "emulatorName" to emulatorName))

private val channel = OkHttpChannelBuilder.forAddress("localhost", this.hostPort)
// Channel and stubs are rebuilt on broken-pipe via [rebuildChannel] — see [runDeviceCall].
@Volatile private var channel: ManagedChannel = buildChannel()
@Volatile private var blockingStub: MaestroDriverGrpc.MaestroDriverBlockingStub = MaestroDriverGrpc.newBlockingStub(channel)
@Volatile private var asyncStub: MaestroDriverGrpc.MaestroDriverStub = MaestroDriverGrpc.newStub(channel)
private val blockingStubWithTimeout get() = blockingStub.withDeadlineAfter(120, TimeUnit.SECONDS)

private fun buildChannel(): ManagedChannel = OkHttpChannelBuilder.forAddress("localhost", this.hostPort)
.usePlaintext()
.socketFactory(AdbSocketFactory { _, port -> dadb.open("tcp:$port") })
.keepAliveTime(2, TimeUnit.MINUTES)
.keepAliveTimeout(20, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.build()
private val blockingStub = MaestroDriverGrpc.newBlockingStub(channel)
private val blockingStubWithTimeout get() = blockingStub.withDeadlineAfter(120, TimeUnit.SECONDS)
private val asyncStub = MaestroDriverGrpc.newStub(channel)

private fun rebuildChannel() {
val old = channel
val fresh = buildChannel()
channel = fresh
blockingStub = MaestroDriverGrpc.newBlockingStub(fresh)
asyncStub = MaestroDriverGrpc.newStub(fresh)
try {
old.shutdownNow()
} catch (e: Exception) {
LOGGER.warn("Failed to shut down old gRPC channel after rebuild", e)
}
}
private val documentBuilderFactory = DocumentBuilderFactory.newInstance()

private val androidWebViewHierarchyClient = AndroidWebViewHierarchyClient(dadb)
Expand Down Expand Up @@ -323,7 +340,9 @@ class AndroidDriver(

override fun contentDescriptor(excludeKeyboardElements: Boolean): TreeNode {
return metrics.measured("operation", mapOf("command" to "contentDescriptor")) {
val response = callViewHierarchy()
val response = runDeviceCall("viewHierarchy") {
blockingStubWithTimeout.viewHierarchy(viewHierarchyRequest {})
}

val document = documentBuilderFactory
.newDocumentBuilder()
Expand Down Expand Up @@ -361,40 +380,6 @@ class AndroidDriver(
)
}

private fun callViewHierarchy(attempt: Int = 1): MaestroAndroid.ViewHierarchyResponse {
return try {
blockingStubWithTimeout.viewHierarchy(viewHierarchyRequest {})
} catch (throwable: StatusRuntimeException) {
val status = Status.fromThrowable(throwable)
when (status.code) {
Status.Code.DEADLINE_EXCEEDED -> {
LOGGER.error("Timeout while fetching view hierarchy")
throw throwable
}
Status.Code.UNAVAILABLE -> {
if (throwable.cause is IOException || throwable.message?.contains("io exception", ignoreCase = true) == true) {
LOGGER.error("Not able to reach the gRPC server while fetching view hierarchy")
} else {
LOGGER.error("Received UNAVAILABLE status with message: ${throwable.message}")
}
}
else -> {
LOGGER.error("Unexpected error: ${status.code} - ${throwable.message}")
}
}

// There is a bug in Android UiAutomator that rarely throws an NPE while dumping a view hierarchy.
// Trying to recover once by giving it a bit of time to settle.
LOGGER.error("Failed to get view hierarchy: ${status.description}", throwable)

if (attempt > 0) {
MaestroTimer.sleep(MaestroTimer.Reason.BUFFER, 1000L)
return callViewHierarchy(attempt - 1)
}
throw throwable
}
}

override fun scrollVertical() {
metrics.measured("operation", mapOf("command" to "scrollVertical")) {
swipe(SwipeDirection.UP, 400)
Expand Down Expand Up @@ -1270,8 +1255,16 @@ class AndroidDriver(

private fun <T> runDeviceCall(callName: String, call: () -> T): T {
return try {
call()
GrpcRetry.withRetry(
callName = callName,
onBrokenPipe = {
LOGGER.warn("[$callName] broken pipe to emulator — rebuilding gRPC channel before retry")
rebuildChannel()
},
call = call,
)
} catch (throwable: StatusRuntimeException) {
// After retries are exhausted, preserve the existing boundary logging.
val status = Status.fromThrowable(throwable)
when (status.code) {
Status.Code.DEADLINE_EXCEEDED -> {
Expand All @@ -1281,11 +1274,10 @@ class AndroidDriver(
Status.Code.UNAVAILABLE -> {
if (throwable.cause is IOException || throwable.message?.contains("io exception", ignoreCase = true) == true) {
LOGGER.error("Not able to reach the gRPC server while processing $callName command")
throw throwable
} else {
LOGGER.error("Received UNAVAILABLE status with message: ${throwable.message} while processing $callName command", throwable)
throw throwable
}
throw throwable
}
Status.Code.INTERNAL -> {
val trailers = Status.trailersFromThrowable(throwable)
Expand Down
43 changes: 43 additions & 0 deletions maestro-client/src/main/java/maestro/utils/GrpcRetry.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package maestro.utils

import io.grpc.Status
import io.grpc.StatusRuntimeException
import java.io.IOException

object GrpcRetry {
fun <T> withRetry(
callName: String,
maxAttempts: Int = 3,
totalBudgetMs: Long = 30_000L,
nowMs: () -> Long = System::currentTimeMillis,
sleepMs: (Long) -> Unit = Thread::sleep,
onBrokenPipe: () -> Unit = {},
call: () -> T,
): T {
val deadlineMs = nowMs() + totalBudgetMs
var lastError: StatusRuntimeException? = null
repeat(maxAttempts) { attempt ->
if (nowMs() >= deadlineMs) {
throw lastError ?: error("Budget exceeded before first attempt")
}
try {
return call()
} catch (e: StatusRuntimeException) {
lastError = e
val code = Status.fromThrowable(e).code
if (code != Status.Code.UNAVAILABLE) throw e
if (attempt == maxAttempts - 1) throw e
if (isBrokenPipe(e)) onBrokenPipe()
sleepMs(200L)
}
}
throw lastError ?: error("Unreachable: maxAttempts must be > 0")
}

private fun isBrokenPipe(e: StatusRuntimeException): Boolean {
if (e.cause is IOException) return true
val message = e.message ?: return false
return message.contains("io exception", ignoreCase = true) ||
message.contains("broken pipe", ignoreCase = true)
}
}
Loading
Loading