Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 23 additions & 38 deletions frontend/src/main/scala/bloop/exec/Forker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,51 +69,36 @@ object Forker {
logger: Logger,
opts: CommonOptions
): Task[Int] = {
val consumeInput: Option[Cancelable] = None
@volatile var shutdownInput: Boolean = false

/* We need to gobble the input manually with a fixed delay because otherwise
* the remote process will not see it.
*
* The input gobble runs on a 50ms basis and it can process a maximum of 4096
* bytes at a time. The rest that is not read will be read in the next 50ms. */
def goobleInput(to: OutputStream): Cancelable = {
val duration = FiniteDuration(50, TimeUnit.MILLISECONDS)
ExecutionContext.ioScheduler.scheduleWithFixedDelay(duration, duration) {
val buffer = new Array[Byte](4096)
if (shutdownInput) {
consumeInput.foreach(_.cancel())
} else {
try {
if (opts.in.available() > 0) {
val read = opts.in.read(buffer, 0, buffer.length)
if (read == -1) ()
else {
to.write(buffer, 0, read)
to.flush()
}
}
} catch {
case t: IOException =>
logger.debug(s"Error from input gobbler: ${t.getMessage}")
logger.trace(t)
// Rethrow so that Monix cancels future scheduling of the same task
throw t
}
}
}
}

val runTask = run(
Some(cwd.underlying.toFile),
cmd,
logger,
opts.env.toMap,
writeToStdIn = outputStream => {
val mainCancellable = goobleInput(outputStream)
val thread = new Thread {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you able to add some tests? I can confirm that this is fixing the issues at hand

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgodzik

I've added the tests as requested.
The test cases verify that the blocking reader thread approach fixes the stdin readline issue, covering:

  1. Input forwarding without data loss
  2. No indefinite waiting when reading from stdin
  3. Compatibility with forker-realib patterns
    4)Standard I/O patterns work correctly

Could you approve the workflow run so CI can execute the tests? Thanks!

override def run(): Unit = {
val buffer = new Array[Byte](4096)
try {
while (opts.in != null) {
val read = opts.in.read(buffer, 0, buffer.length)
if (read == -1) return
else {
outputStream.write(buffer, 0, read)
outputStream.flush()
}
}
} catch {
case t: IOException =>
logger.debug(s"Error from input reader: ${t.getMessage}")
logger.trace(t)
}
}
}

thread.start()

Cancelable { () =>
shutdownInput = true
mainCancellable.cancel()
thread.interrupt()
}
},
debugLog = msg => {
Expand Down