Skip to content

Commit 50776b8

Browse files
committed
feat(ios): proper implementation of output idle timeout
1 parent 64c30e3 commit 50776b8

File tree

6 files changed

+44
-24
lines changed

6 files changed

+44
-24
lines changed

core/src/main/kotlin/com/malinskiy/marathon/Marathon.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class Marathon(
124124
throw ConfigurationException("Unknown test parser type for ${testParser::class}, should inherit from either ${LocalTestParser::class.simpleName} or ${RemoteTestParser::class.simpleName}")
125125
}
126126
}
127-
}.let { println("Took $it ms") }
127+
}.let { log.debug { "Parsing took $it ms" } }
128128

129129
usageTracker.meta(
130130
version = BuildConfig.VERSION, releaseMode = BuildConfig.RELEASE_MODE, vendor = when (configuration.vendorConfiguration) {

vendor/vendor-ios/src/main/kotlin/com/malinskiy/marathon/ios/cmd/BaseCommand.kt

+26-12
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package com.malinskiy.marathon.ios.cmd
22

3+
import kotlinx.coroutines.CancellationException
34
import kotlinx.coroutines.CompletableJob
45
import kotlinx.coroutines.Deferred
56
import kotlinx.coroutines.Dispatchers
67
import kotlinx.coroutines.async
8+
import kotlinx.coroutines.awaitAll
9+
import kotlinx.coroutines.cancel
710
import kotlinx.coroutines.cancelAndJoin
811
import kotlinx.coroutines.channels.ReceiveChannel
12+
import kotlinx.coroutines.channels.onClosed
13+
import kotlinx.coroutines.channels.onFailure
14+
import kotlinx.coroutines.channels.onSuccess
915
import kotlinx.coroutines.runBlocking
1016
import kotlinx.coroutines.supervisorScope
1117
import kotlinx.coroutines.withContext
@@ -19,29 +25,37 @@ abstract class BaseCommand(
1925
override suspend fun await(): CommandResult = withContext(Dispatchers.IO) {
2026
val deferredStdout = supervisorScope {
2127
async(job) {
22-
val stdoutBuffer = mutableListOf<String>()
23-
for (line in stdout) {
24-
stdoutBuffer.add(line)
28+
val buffer = mutableListOf<String>()
29+
while(true) {
30+
val channelResult = stdout.receiveCatching()
31+
channelResult.onSuccess { buffer.add(it) }
32+
channelResult.onClosed { if (it != null) cancel(CancellationException("Channel closed", it)) }
33+
channelResult.onFailure { if (it != null) cancel(CancellationException("Channel failed", it)) }
34+
35+
if (!channelResult.isSuccess) break
2536
}
26-
stdoutBuffer
37+
buffer
2738
}
2839
}
2940

3041
val deferredStderr = supervisorScope {
3142
async(job) {
32-
val stderrBuffer = mutableListOf<String>()
33-
for (line in stderr) {
34-
stderrBuffer.add(line)
43+
val buffer = mutableListOf<String>()
44+
while(true) {
45+
val channelResult = stderr.receiveCatching()
46+
channelResult.onSuccess { buffer.add(it) }
47+
channelResult.onClosed { if (it != null) cancel(CancellationException("Channel closed", it)) }
48+
channelResult.onFailure { if (it != null) cancel(CancellationException("Channel failed", it)) }
49+
50+
if (!channelResult.isSuccess) break
3551
}
36-
stderrBuffer
52+
buffer
3753
}
3854
}
3955

40-
val out = deferredStdout.await()
41-
val err = deferredStderr.await()
42-
val exitCode = exitCode.await()
56+
val (out, err, exitCode) = awaitAll(deferredStdout, deferredStderr, exitCode)
4357

44-
CommandResult(out, err, exitCode)
58+
CommandResult(out as List<String>, err as List<String>, exitCode as Int)
4559
}
4660

4761
override suspend fun drain() {

vendor/vendor-ios/src/main/kotlin/com/malinskiy/marathon/ios/cmd/local/KotlinProcessCommandExecutor.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class KotlinProcessCommandExecutor(
6666
}
6767

6868
val stdout = produceLinesManually(job, process.inputStream, idleTimeout, charset, channelCapacity) { process.isAlive && !exitCode.isCompleted }
69-
val stderr = produceLinesManually(job, process.errorStream, idleTimeout, charset, channelCapacity) { process.isAlive && !exitCode.isCompleted }
69+
val stderr = produceLinesManually(job, process.errorStream, maxOf(idleTimeout, timeout), charset, channelCapacity) { process.isAlive && !exitCode.isCompleted }
7070

7171

7272
return KotlinProcessCommand(

vendor/vendor-ios/src/main/kotlin/com/malinskiy/marathon/ios/cmd/remote/ssh/sshj/SshjCommandExecutor.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@ class SshjCommandExecutor(
5555
}
5656
}
5757
val cmd = session.exec(escapedCmd)
58-
58+
5959
val stdout = produceLinesManually(job, cmd.inputStream, idleTimeout, charset, channelCapacity) { cmd.isOpen && !cmd.isEOF }
60-
val stderr = produceLinesManually(job, cmd.errorStream, idleTimeout, charset, channelCapacity) { cmd.isOpen && !cmd.isEOF }
60+
//Idling on stderr should not be considered bad
61+
//It would be nice to combine stdout and stderr but simple version here is enough for now
62+
val stderr = produceLinesManually(job, cmd.errorStream, maxOf(idleTimeout, timeout), charset, channelCapacity) { cmd.isOpen && !cmd.isEOF }
6163
val exitCode: Deferred<Int?> = async(job) {
6264
val result = withTimeoutOrNull(timeout) {
6365
cmd.suspendFor()
@@ -91,7 +93,7 @@ private fun String.isShellscaped(): Boolean {
9193

9294
private val SHELL_SPECIAL_CHARS = setOf('"',' ','$','\'','\\','#','=','[',']','!','>','<','|',';','{','}','(',')','*','?','~','&')
9395
private fun String.containsShellSpecialChars(): Boolean {
94-
return any {
96+
return any {
9597
SHELL_SPECIAL_CHARS.contains(it)
9698
}
9799
}

vendor/vendor-ios/src/main/kotlin/com/malinskiy/marathon/ios/cmd/remote/ssh/sshj/SshjCommandSession.kt

+4-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import com.malinskiy.marathon.ios.cmd.BaseCommand
44
import kotlinx.coroutines.CompletableJob
55
import kotlinx.coroutines.Deferred
66
import kotlinx.coroutines.channels.ReceiveChannel
7-
import kotlinx.coroutines.channels.SendChannel
8-
import kotlinx.coroutines.channels.trySendBlocking
97
import net.schmizz.sshj.connection.channel.direct.Session
108
import net.schmizz.sshj.connection.channel.direct.Signal
119
import java.util.concurrent.atomic.AtomicBoolean
@@ -32,6 +30,10 @@ class SshjCommandSession(
3230

3331
override fun close() {
3432
if (!closed.getAndSet(true)) {
33+
if (command.isOpen) {
34+
terminate()
35+
}
36+
command.close()
3537
command.join()
3638
super.close()
3739
}

vendor/vendor-ios/src/main/kotlin/com/malinskiy/marathon/ios/extensions/Reader.kt

+7-5
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ fun CoroutineScope.produceLinesManually(
6666
available > 0 -> inputStream.read(byteArray, 0, min(available, byteArray.size))
6767
else -> 0
6868
}
69+
70+
val timeSinceLastOutputMillis = System.currentTimeMillis() - lastOutputTimeMillis
71+
if (timeSinceLastOutputMillis > idleTimeout.toMillis()) {
72+
close(TimeoutException("idle timeout $idleTimeout reached"))
73+
break
74+
}
75+
6976
// if there was nothing to read
7077
if (count == 0) {
7178
// if session received EOF or has been closed, reading stops
@@ -77,11 +84,6 @@ fun CoroutineScope.produceLinesManually(
7784
} else if (count == -1) {
7885
break
7986
} else {
80-
val timeSinceLastOutputMillis = System.currentTimeMillis() - lastOutputTimeMillis
81-
if (timeSinceLastOutputMillis > idleTimeout.toMillis()) {
82-
close(TimeoutException("idle timeout $idleTimeout reached"))
83-
break
84-
}
8587
lineBuffer.append(byteArray, count)
8688
lastOutputTimeMillis = System.currentTimeMillis()
8789
}

0 commit comments

Comments
 (0)