Skip to content

Process.stdout.linesStream is not interruptible #425

@haja

Description

@haja

Hey, if I run this the program hangs for the full 3 seconds of sleep 3, instead of failing after 1 second by the timeoutFail(..).

import zio.durationInt
import zio.stream.ZSink
import zio.Clock
import zio.Scope
import zio.ZIO
import zio.ZIOAppArgs
import zio.ZIOAppDefault

object ProcessBug extends ZIOAppDefault {

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
    for {
      process <- ZIO.acquireRelease(
        ZIO.attempt(zio.process.Process(new ProcessBuilder("sleep", "3").start())).orDie
      )(process =>
        ZIO.debug("process cleanup") *>
          process.killTreeForcibly.orDie)

      _         <- ZIO.debug("sleep started")
      timestamp <- Clock.instant

      _ <- process.stdout.linesStream
        .run(ZSink.foreach(line => ZIO.debug(line)))
        .timeoutFail("stream timeout")(1.second)
        .tapError(_ =>
          for {
            afterTimeout <- Clock.instant
            _            <- ZIO.debug(s"time until error: ${afterTimeout.toEpochMilli - timestamp.toEpochMilli}ms")
          } yield ())
    } yield ()
}

sample output:

sleep started
time until error: 2971ms
process cleanup
timestamp=2024-07-31T09:10:09.799879109Z level=ERROR thread=#zio-fiber-1 message="" cause="Exception in thread "zio-fiber-11" java.lang.String: stream timeout
	at <empty>.ProcessBugWorkaround.run(ProcessBug.scala:53)
	at <empty>.ProcessBugWorkaround.run(ProcessBug.scala:52)"

The issue seems to be that zio.process.ProcessStream.stream (called by linesStream) uses ZStream.fromInputStreamZIO, which in turn opens the InputStream with ZIO.attemptBlockingIO, which is not interruptible.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions