Skip to content

Added output configurations. Related to #3170 #3543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ private[io] object child_process {

var env: js.UndefOr[js.Dictionary[String]] = js.undefined

var stdio: js.UndefOr[js.Any] = js.undefined
}

@js.native
Expand Down
44 changes: 36 additions & 8 deletions io/js/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,47 @@ private[process] trait ProcessesCompanionPlatform {
def spawn(process: ProcessBuilder): Resource[F, Process[F]] =
Resource {
F.async_[(Process[F], F[Unit])] { cb =>
val childProcess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
new facade.child_process.SpawnOptions {
cwd = process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
env =


val spawnOptions = new facade.child_process.SpawnOptions {
"cwd" -> process.workingDirectory.fold[js.UndefOr[String]](js.undefined)(_.toString)
"env" ->(
if (process.inheritEnv)
(facade.process.env ++ process.extraEnv).toJSDictionary
else
process.extraEnv.toJSDictionary
process.extraEnv.toJSDictionary)
}

val childProcess = facade.child_process.spawn(
process.command,
process.args.toJSArray,
spawnOptions.asInstanceOf[facade.child_process.SpawnOptions]
)

process.outputConfig.stdin match {
case StreamRedirect.Inherit => spawnOptions.stdio = "inherit".asInstanceOf[js.Any]
case StreamRedirect.Discard => spawnOptions.stdio = "ignore".asInstanceOf[js.Any]
case StreamRedirect.File(path) =>
spawnOptions.stdio = js.Array(path.toString, "pipe", "pipe")
case StreamRedirect.Pipe => //Default behaviour
}

process.outputConfig.stdout match {
case StreamRedirect.Inherit => spawnOptions.stdio = "inherit".asInstanceOf[js.Any]
case StreamRedirect.Discard => spawnOptions.stdio = "ignore".asInstanceOf[js.Any]
case StreamRedirect.File(path) =>
spawnOptions.stdio = js.Array(path.toString, "pipe", "pipe")
case StreamRedirect.Pipe => //Default behaviour
}

process.outputConfig.stderr match {
case StreamRedirect.Inherit => spawnOptions.stdio = "inherit".asInstanceOf[js.Any]
case StreamRedirect.Discard => spawnOptions.stdio = "ignore".asInstanceOf[js.Any]
case StreamRedirect.File(path) =>
spawnOptions.stdio = js.Array(path.toString, "pipe", "pipe")
case StreamRedirect.Pipe => //Default behaviour
}

val fs2Process = new UnsealedProcess[F] {

def isAlive: F[Boolean] = F.delay {
Expand Down Expand Up @@ -84,7 +112,7 @@ private[process] trait ProcessesCompanionPlatform {
} else {
childProcess.kill()
childProcess.once("exit", () => cb(Either.unit))
Left(None)
Left(None)
}
}
}
Expand Down
106 changes: 65 additions & 41 deletions io/jvm-native/src/main/scala/fs2/io/process/ProcessesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import cats.effect.kernel.Resource
import cats.syntax.all.*
import fs2.io.CollectionCompat.*

import java.lang
import java.lang.ProcessBuilder.Redirect


private[process] trait ProcessesCompanionPlatform {
def forAsync[F[_]](implicit F: Async[F]): Processes[F] = new UnsealedProcesses[F] {
Expand All @@ -37,60 +38,83 @@ private[process] trait ProcessesCompanionPlatform {
Resource
.make {
F.blocking {
val builder = new lang.ProcessBuilder((process.command :: process.args).asJava)
val builder = new java.lang.ProcessBuilder((process.command :: process.args).asJava)

process.workingDirectory.foreach { path =>
builder.directory(path.toNioPath.toFile)
}

val env = builder.environment()
if (!process.inheritEnv) env.clear()
process.extraEnv.foreach { case (k, v) =>
env.put(k, v)
}

process.workingDirectory.foreach { path =>
builder.directory(path.toNioPath.toFile)
process.outputConfig.stdin match {
case StreamRedirect.Inherit => builder.redirectInput(Redirect.INHERIT)
case StreamRedirect.Discard => builder.redirectInput(Redirect.DISCARD)
case StreamRedirect.File(path) =>
builder.redirectInput(Redirect.from(path.toNioPath.toFile))
case StreamRedirect.Pipe =>
}

val env = builder.environment()
if (!process.inheritEnv) env.clear()
process.extraEnv.foreach { case (k, v) =>
env.put(k, v)
process.outputConfig.stdout match {
case StreamRedirect.Inherit => builder.redirectOutput(Redirect.INHERIT)
case StreamRedirect.Discard => builder.redirectOutput(Redirect.DISCARD)
case StreamRedirect.File(path) =>
builder.redirectOutput(Redirect.to(path.toNioPath.toFile))
case StreamRedirect.Pipe =>
}

builder.start()
}
} { process =>
F.delay(process.isAlive())
.ifM(
evalOnVirtualThreadIfAvailable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted by mistake?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must've been deleted while solving merge conflicts adding them back

F.blocking {
process.destroy()
process.waitFor()
()
}
),
F.unit
)
process.outputConfig.stderr match {
case StreamRedirect.Inherit => builder.redirectError(Redirect.INHERIT)
case StreamRedirect.Discard => builder.redirectError(Redirect.DISCARD)
case StreamRedirect.File(path) =>
builder.redirectError(Redirect.to(path.toNioPath.toFile))
case StreamRedirect.Pipe =>
}

builder.start()
}
.map { process =>
new UnsealedProcess[F] {
def isAlive = F.delay(process.isAlive())
} { process =>
F.delay(process.isAlive())
.ifM(
evalOnVirtualThreadIfAvailable(
F.blocking {
process.destroy()
process.waitFor()
()
}
),
F.unit
)
}
.map { process =>
new UnsealedProcess[F] {
def isAlive = F.delay(process.isAlive())

def exitValue = isAlive.ifM(
evalOnVirtualThreadIfAvailable(F.interruptible(process.waitFor())),
F.delay(process.exitValue())
)

def stdin = writeOutputStreamCancelable(
F.delay(process.getOutputStream()),
F.blocking(process.destroy())
)

def stdout = readInputStreamCancelable(
F.delay(process.getInputStream()),
F.blocking(process.destroy()),
8192
)
def stdin = writeOutputStreamCancelable(
F.delay(process.getOutputStream()),
F.blocking(process.destroy())
)

def stderr = readInputStreamCancelable(
F.delay(process.getErrorStream()),
F.blocking(process.destroy()),
8192
)
def stdout = readInputStreamCancelable(
F.delay(process.getInputStream()),
F.blocking(process.destroy()),
8192
)

}
def stderr = readInputStreamCancelable(
F.delay(process.getErrorStream()),
F.blocking(process.destroy()),
8192
)
}
}
}
}
40 changes: 37 additions & 3 deletions io/shared/src/main/scala/fs2/io/process/ProcessBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ sealed abstract class ProcessBuilder private {
*/
def workingDirectory: Option[Path]

/** Configures how stdout and stderr should be handled. */
def outputConfig: ProcessOutputConfig

/** @see [[command]] */
def withCommand(command: String): ProcessBuilder

Expand All @@ -67,17 +70,45 @@ sealed abstract class ProcessBuilder private {
/** @see [[workingDirectory]] */
def withCurrentWorkingDirectory: ProcessBuilder

/** @see [[outputMode]] */
def withOutputConfig(outputConfig: ProcessOutputConfig): ProcessBuilder
Comment on lines +73 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think separating it out into three config methods for stdin/stdout/stderr (like the JDK process builder) will make the API easier to use.


/* @param mode The mode for handling stdin
*/
def redirectInput(mode: StreamRedirect): ProcessBuilder =
withOutputConfig(outputConfig.copy(stdin = mode))

def redirectOutput(mode: StreamRedirect): ProcessBuilder =
withOutputConfig(outputConfig.copy(stdout = mode))

def redirectError(mode: StreamRedirect): ProcessBuilder =
withOutputConfig(outputConfig.copy(stderr = mode))

/** Starts the process and returns a handle for interacting with it.
* Closing the resource will kill the process if it has not already terminated.
*/
final def spawn[F[_]: Processes]: Resource[F, Process[F]] =
Processes[F].spawn(this)
}

sealed abstract class StreamRedirect
object StreamRedirect {
case object Pipe extends StreamRedirect
case object Inherit extends StreamRedirect
case object Discard extends StreamRedirect
final case class File(path: Path) extends StreamRedirect
}

final case class ProcessOutputConfig(
stdin: StreamRedirect = StreamRedirect.Pipe,
stdout: StreamRedirect = StreamRedirect.Pipe,
stderr: StreamRedirect = StreamRedirect.Pipe
)

object ProcessBuilder {

def apply(command: String, args: List[String]): ProcessBuilder =
ProcessBuilderImpl(command, args, true, Map.empty, None)
ProcessBuilderImpl(command, args, true, Map.empty, None, ProcessOutputConfig())

def apply(command: String, args: String*): ProcessBuilder =
apply(command, args.toList)
Expand All @@ -87,7 +118,8 @@ object ProcessBuilder {
args: List[String],
inheritEnv: Boolean,
extraEnv: Map[String, String],
workingDirectory: Option[Path]
workingDirectory: Option[Path],
outputConfig: ProcessOutputConfig
) extends ProcessBuilder {

def withCommand(command: String): ProcessBuilder = copy(command = command)
Expand All @@ -100,7 +132,9 @@ object ProcessBuilder {

def withWorkingDirectory(workingDirectory: Path): ProcessBuilder =
copy(workingDirectory = Some(workingDirectory))

def withCurrentWorkingDirectory: ProcessBuilder = copy(workingDirectory = None)
}

def withOutputConfig(outputConfig: ProcessOutputConfig): ProcessBuilder = copy(outputConfig = outputConfig)
}
}
1 change: 0 additions & 1 deletion io/shared/src/main/scala/fs2/io/process/Processes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import cats.effect.IO
import cats.effect.LiftIO
import cats.effect.kernel.Async
import cats.effect.kernel.Resource

sealed trait Processes[F[_]] {

def spawn(process: ProcessBuilder): Resource[F, Process[F]]
Expand Down
51 changes: 46 additions & 5 deletions io/shared/src/test/scala/fs2/io/process/ProcessSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@ class ProcessSuite extends Fs2IoSuite {
}

test("stdout and stderr") {
ProcessBuilder(
"node",
"-e",
"console.log('good day stdout'); console.error('how do you do stderr')"
).spawn[IO]
ProcessBuilder("node", "-e", "console.log('good day stdout'); console.error('how do you do stderr')")
.withOutputConfig(ProcessOutputConfig())
.spawn[IO]
.use { p =>
val testOut = p.stdout
.through(fs2.text.utf8.decode)
Expand All @@ -72,6 +70,49 @@ class ProcessSuite extends Fs2IoSuite {
}
}

test("merged stdout and stderr") {
ProcessBuilder("node", "-e", "console.log('merged stdout'); console.error('merged stderr')")
.withOutputConfig(ProcessOutputConfig(stdout = StreamRedirect.Pipe, stderr = StreamRedirect.Pipe))
.spawn[IO]
.use { p =>
p.stdout
.through(fs2.text.utf8.decode)
.compile
.string
.assert(s => s.contains("merged stdout") && s.contains("merged stderr"))
}
}

test("file output") {
Files[IO].tempFile.use { path =>
ProcessBuilder("echo", "file output test")
.withOutputConfig(ProcessOutputConfig(stdout = StreamRedirect.File(path)))
.spawn[IO]
.use(_.exitValue)
.assertEquals(0) *>
Files[IO].readUtf8(path).compile.string.assertEquals("file output test\n")
}
}

test("ignored output") {
ProcessBuilder("echo", "ignored output")
.withOutputConfig(ProcessOutputConfig(stdout = StreamRedirect.Discard))
.spawn[IO]
.use(_.exitValue)
.assertEquals(0)
}

test("stdin piping") {
ProcessBuilder("cat")
.withOutputConfig(ProcessOutputConfig(stdin = StreamRedirect.Pipe))
.spawn[IO]
.use { p =>
val input = Stream.emit("piped input test").through(fs2.text.utf8.encode).through(p.stdin).compile.drain
val output = p.stdout.through(fs2.text.utf8.decode).compile.string.assertEquals("piped input test")
input *> output
}
}

if (!isNative)
test("cat") {
ProcessBuilder("cat").spawn[IO].use { p =>
Expand Down
Loading