Skip to content

Commit f0089c6

Browse files
committed
hacks to get tests passing
- there were a lot of problems produced in conversion to kotlin 1.3, im really not sure where all of them come from - one major problem is that by eagerly creating stderr and stdout Channel<Char>s, theres some new back-pressure they're exerting on the SimpleMulticaster that I dont understand. - most tests pass, this should be enough for us to get started on some of our refactors in our project - i will need to do another overhaul. The state-machine work isnt very promising, so im not sure how i'm going to get these things all connected.
1 parent a11f405 commit f0089c6

File tree

11 files changed

+87
-88
lines changed

11 files changed

+87
-88
lines changed

Diff for: build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ plugins {
1717

1818
allprojects {
1919
group 'groostav'
20-
version '0.5'
20+
version '0.6'
2121

2222
repositories {
2323
mavenCentral()
@@ -30,7 +30,7 @@ subprojects {
3030
apply plugin: 'java'
3131
apply plugin: 'kotlin'
3232

33-
sourceCompatibility = 1.8
33+
sourceCompatibility = 9
3434
}
3535

3636
project('core'){

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/ChannelPumps.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import java.io.OutputStreamWriter
1313

1414

1515
internal fun OutputStream.toSendChannel(config: ProcessBuilder): SendChannel<Char> {
16-
return config.scope.actor<Char>(Unconfined + CoroutineName("process.stdin")) {
16+
return GlobalScope.actor<Char>(Unconfined + CoroutineName("process.stdin")) {
1717

1818
val writer = OutputStreamWriter(this@toSendChannel, config.encoding)
1919

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/ProcessBuilder.kt

+12-42
Original file line numberDiff line numberDiff line change
@@ -94,23 +94,15 @@ data class ProcessBuilder internal constructor(
9494
var includeDescendantsInKill: Boolean = false,
9595

9696
/**
97-
* If the resulting [RunningProcess] instance completes with an exit code that is in
98-
* the specified list the [RunningProcess] instance will complete normally, if it completes
99-
* with a value that is not in the list then it will fail.
97+
* Specifies the exit codes that are considered successful, determining whether the [RunningProcess]
98+
* completes normally or fails with an exception.
10099
*
101-
* How this set is interpreted:
102-
*
103-
* 1. If a process exits with a code that is in this set, then calls to await [RunningProcess.exitCode]
104-
* will yield that value.
105-
* 2. If the set is empty, all exit codes will be treated as valid and be yielded from
106-
* [RunningProcess.exitCode]
107-
* 3. If a process exists with a code that is not in this list
108-
* and the list is not empty, an [InvalidExitValueException]
109-
* is thrown when awaiting [RunningProcess.exitCode].
110-
*
111-
* see [ANY_EXIT_CODE] for allowing the process to return normally regardless of exit code
100+
* If a [RunningProcess] instance exits...
101+
* 1. ...and this set is `null` then all exit codes will be treated as valid will be used to complete [RunningProcess.exitCode]
102+
* 2. ...with an exit code that is in this set then it will be used to complete [RunningProcess.exitCode]
103+
* 3. ...with an exit code that is not in this non-null set then [RunningProcess.exitCode] throws [InvalidExitValueException].
112104
*/
113-
var expectedOutputCodes: Set<Int> = setOf(0), //see also
105+
var expectedOutputCodes: Set<Int>? = setOf(0), //see also
114106

115107
/**
116108
* Number of lines to be kept for generation of the exception on a bad exit code.
@@ -125,31 +117,9 @@ data class ProcessBuilder internal constructor(
125117
var linesForExceptionError: Int = 15,
126118

127119
//used to point at caller of exec() through suspension context
128-
internal var source: ExecEntryPoint? = null,
129-
internal val scope: CoroutineScope
130-
) {
131-
132-
override fun toString(): String = "ProcessBuilder(" +
133-
"command=$command, " +
134-
"workingDirectory=$workingDirectory, " +
135-
"delimiters=${delimiters.toString().encodeLineChars()}, " +
136-
"inputFlushMarker=${inputFlushMarker.toString().encodeLineChars()}, " +
137-
"encoding=$encoding, " +
138-
"standardErrorBufferCharCount=$standardErrorBufferCharCount, " +
139-
"standardOutputBufferCharCount=$standardOutputBufferCharCount, " +
140-
"aggregateOutputBufferLineCount=$aggregateOutputBufferLineCount, " +
141-
"gracefulTimeousMillis=$gracefulTimeousMillis, " +
142-
"includeDescendantsInKill=$includeDescendantsInKill, " +
143-
"expectedOutputCodes=$expectedOutputCodes, " +
144-
"linesForExceptionError=$linesForExceptionError" +
145-
")"
146-
}
147-
148-
/**
149-
* Indicates that a process can return with any exit code.
150-
*/
151-
//TODO kotlin 1.3 includes BitSet
152-
val ANY_EXIT_CODE: Set<Int> = (0..Int.MAX_VALUE).asSet()
120+
internal var source: ExecEntryPoint? = null
121+
// internal val scope: CoroutineScope
122+
)
153123

154124
object InheritedDefaultEnvironment: Map<String, String> by System.getenv()
155125

@@ -159,13 +129,13 @@ private fun String.encodeLineChars() = this
159129

160130
internal inline fun processBuilder(coroutineScope: CoroutineScope, configureBlock: ProcessBuilder.() -> Unit): ProcessBuilder {
161131

162-
val initial = ProcessBuilder(scope = coroutineScope).apply(configureBlock)
132+
val initial = ProcessBuilder().apply(configureBlock)
163133
val initialCommandList = initial.command.toList()
164134

165135
val result = initial.copy (
166136
command = initialCommandList,
167137
delimiters = initial.delimiters.toList(),
168-
expectedOutputCodes = initial.expectedOutputCodes.toSet(),
138+
expectedOutputCodes = initial.expectedOutputCodes?.toSet(),
169139
environment = if(initial.environment === InheritedDefaultEnvironment) initial.environment else initial.environment.toMap()
170140

171141
//dont deep-copy source, since its internal

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/RunningProcess.kt

+16-15
Original file line numberDiff line numberDiff line change
@@ -191,20 +191,22 @@ internal class RunningProcessImpl(
191191

192192
// region output
193193

194-
override val standardOutput: ReceiveChannel<Char> =
195-
_standardOutputSource.openSubscription().tail(config.standardOutputBufferCharCount)
194+
override val standardOutput: ReceiveChannel<Char>
195+
get() = TODO()
196+
// = _standardOutputSource.openSubscription().tail(config.standardOutputBufferCharCount)
196197

197-
override val standardError: ReceiveChannel<Char> =
198-
_standardErrorSource.openSubscription().tail(config.standardErrorBufferCharCount)
198+
override val standardError: ReceiveChannel<Char>
199+
get() = TODO()
200+
// = _standardErrorSource.openSubscription().tail(config.standardErrorBufferCharCount)
199201

200202
// endregion
201203

202-
// region input
204+
// region inputsst
203205

204206
private val _standardInput: SendChannel<Char> by lazy { process.outputStream.toSendChannel(config) }
205207
private val inputLineLock = Mutex()
206208

207-
override val standardInput: SendChannel<Char> by lazy { config.scope.actor<Char> {
209+
override val standardInput: SendChannel<Char> by lazy { GlobalScope.actor<Char> {
208210
consumeEach {
209211
inputLineLock.withLock {
210212
_standardInput.send(it)
@@ -219,7 +221,7 @@ internal class RunningProcessImpl(
219221

220222
private val killed = AtomicBoolean(false)
221223

222-
private val _exitCode = config.scope.async(Unconfined + CoroutineName("process(PID=$processID)._exitcode")) {
224+
private val _exitCode = GlobalScope.async(CoroutineName("process(PID=$processID)._exitcode")) {
223225

224226
val result = processListenerProvider.exitCodeDeferred.value.await()
225227

@@ -233,7 +235,7 @@ internal class RunningProcessImpl(
233235
result
234236
}
235237

236-
private val errorHistory = config.scope.async<Queue<String>>(Unconfined + CoroutineName("process(PID=$processID).errorHistory")) {
238+
private val errorHistory = GlobalScope.async<Queue<String>>(Unconfined + CoroutineName("process(PID=$processID).errorHistory")) {
237239
val result = LinkedList<String>()
238240
if (config.linesForExceptionError > 0) {
239241
_standardErrorLines.openSubscription().consumeEach {
@@ -249,13 +251,14 @@ internal class RunningProcessImpl(
249251
//user-facing control root.
250252
override val exitCode: Deferred<Int> = CompletableDeferred<Int>().apply {
251253
//use launch rather than async to avoid throwing across coroutine boundary.
252-
config.scope.launch(Unconfined + CoroutineName("process(PID=$processID).exitCode")) {
254+
GlobalScope.launch(Unconfined + CoroutineName("process(PID=$processID).exitCode")) {
253255
val (result: Int?, ex: Throwable?) = try {
254256
val result = _exitCode.await()
257+
val expectedCodes = config.expectedOutputCodes
255258

256259
when {
257260
killed.get() -> throw CancellationException()
258-
result in config.expectedOutputCodes -> result to null
261+
expectedCodes == null || result in expectedCodes -> result to null
259262
else -> {
260263
val errorLines = errorHistory.await().toList()
261264
val exception = makeExitCodeException(config, result, errorLines)
@@ -277,8 +280,6 @@ internal class RunningProcessImpl(
277280
ex != null -> completeExceptionally(ex)
278281
result != null -> complete(result)
279282
}
280-
281-
val x = 4;
282283
}
283284

284285
}
@@ -327,7 +328,7 @@ internal class RunningProcessImpl(
327328
//region SendChannel
328329

329330
private val inputLines by lazy {
330-
config.scope.actor<String>(Unconfined) {
331+
GlobalScope.actor<String>(Unconfined) {
331332
val newlineString = System.lineSeparator()
332333
consumeEach { nextLine ->
333334
inputLineLock.withLock {
@@ -360,7 +361,7 @@ internal class RunningProcessImpl(
360361
private val aggregateChannel: ReceiveChannel<ProcessEvent> = when(config.aggregateOutputBufferLineCount){
361362
0 -> {
362363
val name = "aggregate[NoBufferedOutput, delay=$_exitCode]"
363-
val actual = config.scope.produce<ProcessEvent>(Unconfined + CoroutineName("Process(PID=$processID).$name"), capacity = 1){
364+
val actual = GlobalScope.produce<ProcessEvent>(Unconfined + CoroutineName("Process(PID=$processID).$name"), capacity = 1){
364365
val code = _exitCode.await()
365366
send(ExitCode(code))
366367

@@ -377,7 +378,7 @@ internal class RunningProcessImpl(
377378

378379
val name = "aggregate[out=$outputLines,err=$errorLines]"
379380

380-
val actual = config.scope.produce<ProcessEvent>(Unconfined + CoroutineName("Process(PID=$processID).$name")) {
381+
val actual = GlobalScope.produce<ProcessEvent>(Unconfined + CoroutineName("Process(PID=$processID).$name")) {
381382
try {
382383
var stderrWasNull = false
383384
var stdoutWasNull = false

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/SimpleInlineMulticaster.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ class SimpleInlineMulticaster<T>(val name: String) {
1919
constructor(): this("anonymous-caster")
2020

2121
sealed class State<T> {
22-
data class Registration<T>(val subs: List<RendezvousChannel<T>> = emptyList()): State<T>()
23-
data class Running<T>(val subs: List<RendezvousChannel<T>> = emptyList()): State<T>()
22+
data class Registration<T>(val subs: List<Channel<T>> = emptyList()): State<T>()
23+
data class Running<T>(val subs: List<Channel<T>> = emptyList()): State<T>()
2424
class Closed<T>(): State<T>()
2525
}
2626

@@ -53,6 +53,7 @@ class SimpleInlineMulticaster<T>(val name: String) {
5353
try {
5454
source.consumeEach { next ->
5555
for (sub in newState.subs) {
56+
val x = 4;
5657
sub.send(next)
5758
// apply back-pressure from _all_ subs,
5859
// suspending the upstream until all children are satisfied.
@@ -88,7 +89,7 @@ class SimpleInlineMulticaster<T>(val name: String) {
8889
when(it){
8990
is State.Registration<T> -> {
9091

91-
val subscription = object: RendezvousChannel<T> by Channel(RENDEZVOUS) {
92+
val subscription = object: Channel<T> by Channel(5) {
9293
val id = it.subs.size+1
9394
override fun toString() = "sub$id-$name"
9495
}

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/backend.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import java.io.InputStreamReader
55
import java.io.Reader
66
import java.nio.CharBuffer
77

8-
internal val TRACE = true
8+
internal val TRACE = java.lang.Boolean.getBoolean("kotlinx.exec.trace")
99

1010
internal inline fun trace(message: () -> String){
1111
if(TRACE){
@@ -129,7 +129,7 @@ data class IntProgressionSet(val src: IntProgression): Set<Int> {
129129
require(src.step >= 1)
130130
}
131131

132-
override val size: Int = (src.last - src.first + 1) / src.step
132+
override val size: Int = (src.last - src.first + 1) / src.step + 1
133133

134134
override operator fun contains(element: Int): Boolean {
135135
if(element < src.first || element > src.last) return false

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/blocking.kt

+5-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ internal class ThreadBlockingListenerProvider(val process: Process, val pid: Int
3030
}
3131
override val exitCodeDeferred = run {
3232
val context = BlockableDispatcher + CoroutineName("blocking-process.WaitFor")
33-
Supported(config.scope.async(context) { process.waitFor() })
33+
Supported(GlobalScope.async(context) { process.waitFor() })
3434
}
3535

3636
/**
@@ -44,7 +44,7 @@ internal class ThreadBlockingListenerProvider(val process: Process, val pid: Int
4444
*/
4545
private fun Reader.toPumpedReceiveChannel(context: CoroutineContext = BlockableDispatcher): ReceiveChannel<Char> {
4646

47-
val result = config.scope.produce(context) {
47+
val result = GlobalScope.produce(context + BlockableDispatcher) {
4848

4949
while (isActive) {
5050
val nextCodePoint = read().takeUnless { it == EOF_VALUE }
@@ -89,7 +89,9 @@ internal class ThreadBlockingListenerProvider(val process: Process, val pid: Int
8989
runBlocking {
9090
val latch = CountDownLatch(jobs)
9191
for(jobId in 1 .. jobs){
92-
launch(CoroutineName("Prestarting-job$jobId-${this@BlockableDispatcher}")) { latch.countDown() }
92+
val name = CoroutineName("Prestarting-job$jobId-${this@BlockableDispatcher}")
93+
launch(name + BlockableDispatcher) {
94+
latch.countDown() }
9395
}
9496

9597
latch.await()

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/exec.kt

+20-8
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package groostav.kotlinx.exec
33
import kotlinx.coroutines.CoroutineScope
44
import kotlinx.coroutines.GlobalScope
55
import kotlinx.coroutines.channels.*
6-
import kotlinx.coroutines.newCoroutineContext
7-
import kotlinx.coroutines.selects.select
86
import java.io.IOException
97
import java.lang.ProcessBuilder as JProcBuilder
108

@@ -99,10 +97,10 @@ suspend fun execVoid(commandFirst: String, vararg commandRest: String): Int = ex
9997
command = listOf(commandFirst) + commandRest.toList()
10098
}
10199

102-
class InvalidExitValueException internal constructor(
100+
class InvalidExitValueException(
103101
val command: List<String>,
104102
val exitValue: Int,
105-
val expectedExitCodes: Set<Int>,
103+
val expectedExitCodes: Set<Int>?,
106104
val recentStandardErrorLines: List<String>,
107105
message: String,
108106
stackTraceApplier: InvalidExitValueException.() -> Unit
@@ -118,22 +116,36 @@ class InvalidExitValueException internal constructor(
118116
}
119117
}
120118

119+
fun `kotlin is pretty smart`(){
120+
val nullableSet: Set<Int>? = setOf(1, 2, 3)
121+
122+
when(nullableSet?.size){
123+
null -> {}
124+
else -> { nullableSet.first() } //smart-cast knew that nullableSet isnt null through the ?. operator? wow.
125+
}
126+
}
127+
121128
internal fun makeExitCodeException(config: ProcessBuilder, exitCode: Int, recentErrorOutput: List<String>): Throwable {
129+
val expectedCodes = config.expectedOutputCodes
122130
val builder = StringBuilder().apply {
123131

124132
appendln("exec '${config.command.joinToString(" ")}'")
125133

126-
val multipleOutputs = config.expectedOutputCodes.size > 1
127-
val exitCodesScription = config.expectedOutputCodes.joinToString("', '")
128-
appendln("exited with code $exitCode (expected ${if(multipleOutputs) "one of " else ""}'$exitCodesScription')")
134+
val parentheticDescription = when(expectedCodes?.size){
135+
null -> "any exit value".also { TODO("How did you get here!?") }
136+
1 -> "${expectedCodes.single()}"
137+
in 2 .. Int.MAX_VALUE -> "one of ${expectedCodes.joinToString()}"
138+
else -> TODO()
139+
}
140+
appendln("exited with code $exitCode (expected $parentheticDescription)")
129141

130142
if(recentErrorOutput.any()){
131143
appendln("the most recent standard-error output was:")
132144
recentErrorOutput.forEach { appendln(it) }
133145
}
134146
}
135147

136-
val result = InvalidExitValueException(config.command, exitCode, config.expectedOutputCodes, recentErrorOutput, builder.toString()){
148+
val result = InvalidExitValueException(config.command, exitCode, expectedCodes, recentErrorOutput, builder.toString()){
137149
val source = config.source
138150
when(source){
139151
is AsynchronousExecutionStart -> {

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/morechannels.kt

+11-8
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,11 @@ internal fun <T> ReceiveChannel<T>.tail(bufferSize: Int): Channel<T> {
161161
GlobalScope.launch(Unconfined + CoroutineName(buffer.toString())) {
162162
try {
163163
if(bufferSize > 0) {
164-
for (item in this@tail) {
165-
buffer.pushForward(item)
164+
val y = 4;
165+
this@tail.consumeEach {
166+
buffer.pushForward(it)
166167
}
168+
val x = 4;
167169
}
168170
}
169171
finally {
@@ -175,10 +177,11 @@ internal fun <T> ReceiveChannel<T>.tail(bufferSize: Int): Channel<T> {
175177
}
176178

177179
private suspend inline fun <T> Channel<T>.pushForward(next: T){
178-
while (! isClosedForSend && !offer(next)) {
179-
val bumpedElement = receiveOrNull()
180-
if (bumpedElement != null){
181-
trace { "WARN: back-pressure forced drop '$bumpedElement' from ${this@pushForward}" }
182-
}
183-
}
180+
offer(next)
181+
// while (! isClosedForSend && !offer(next)) {
182+
// val bumpedElement = receiveOrNull()
183+
// if (bumpedElement != null){
184+
// trace { "WARN: back-pressure forced drop '$bumpedElement' from ${this@pushForward}" }
185+
// }
186+
// }
184187
}

Diff for: core/src/main/kotlin/groostav/kotlinx/exec/polling.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ internal class PollingListenerProvider(val process: Process, val pid: Int, val c
3131
private @Volatile var manualEOF = false
3232

3333
override val standardErrorChannel = run {
34-
val context = Unconfined + CoroutineName("polling-process.stderr")
34+
val context = CoroutineName("polling-process.stderr")
3535
Supported(standardErrorReader.toPolledReceiveChannel(context, DelayMachine(PollPeriodWindow, otherSignals)))
3636
}
3737
override val standardOutputChannel = run {
38-
val context = Unconfined + CoroutineName("polling-process.stdout")
38+
val context = CoroutineName("polling-process.stdout")
3939
Supported(standardOutputReader.toPolledReceiveChannel(context, DelayMachine(PollPeriodWindow, otherSignals)))
4040
}
4141

4242
override val exitCodeDeferred = Supported(
43-
GlobalScope.async(Unconfined + CoroutineName("polling-process.waitFor")) {
43+
GlobalScope.async(CoroutineName("polling-process.waitFor")) {
4444
val delayMachine = DelayMachine(PollPeriodWindow, otherSignals)
4545
delayMachine.waitForByPollingPeriodically { ! process.isAlive }
4646
val result = process.exitValue()

0 commit comments

Comments
 (0)