diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index 6308cf0f05..02639dcbce 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -21,10 +21,12 @@ package fs2 +import cats.effect.{Concurrent, Resource} import cats.effect.kernel.Async import cats.effect.kernel.Sync import cats.effect.syntax.all._ import cats.syntax.all._ +import fs2.io.file.{Files, Flags, Path, ReadCursor, WriteCursor} import java.io.{InputStream, OutputStream} @@ -151,4 +153,52 @@ package object io extends ioplatform { os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush()))) } + def fileBuffer[F[_]: Concurrent: Files]( + fileResource: Resource[F, Path], + chunkSize: Int + ): Pipe[F, Byte, Stream[F, Byte]] = { + def writeChunks( + writeCursor: WriteCursor[F], + stream: Stream[F, Byte] + ): Pull[F, WriteCursor[F], WriteCursor[F]] = + stream.pull.uncons.flatMap { + case Some((chunk, tail)) => + writeCursor.writePull(chunk).flatMap { writeCursor => + Pull.output1(writeCursor) >> + writeChunks(writeCursor, tail) + } + case None => Pull.pure(writeCursor) + } + + def readChunks( + readCursor: ReadCursor[F], + offsets: Stream[F, Long] + ): Pull[F, Byte, ReadCursor[F]] = + offsets.pull.uncons1.flatMap { + case Some((offset, tail)) => + readCursor.readUntil(chunkSize, offset).flatMap { readCursor => + readChunks(readCursor, tail) + } + case None => Pull.pure(readCursor) + } + + { (stream: Stream[F, Byte]) => + for { + file <- Stream.resource(fileResource) + offsets <- Stream.resource(Files[F].writeCursor(file, Flags.Write)).flatMap { writeCursor => + writeChunks(writeCursor, stream).void.stream + .map(e => (e.offset, true)) + .append(Stream.emit((0L, false))) + .scan1[(Long, Boolean)] { case ((lastOffset, _), (offset, hasNext)) => + (Math.max(lastOffset, offset), hasNext) + } + .hold1 + .map(_.discrete.takeThrough(_._2).map(_._1)) + } + } yield Stream.resource(Files[F].readCursor(file, Flags.Read)).flatMap { readCursor => + readChunks(readCursor, offsets).void.stream + } + } + } + }