From 6884aacc502e64a41b724277a8742fe8023cea7a Mon Sep 17 00:00:00 2001 From: Pierre Kisters Date: Thu, 16 Nov 2023 14:56:00 +0100 Subject: [PATCH 1/3] implemented fileBuffer --- io/shared/src/main/scala/fs2/io/io.scala | 53 ++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index 6308cf0f05..aef5fe8617 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -21,10 +21,12 @@ package fs2 +import cats.effect.{Concurrent, Resource} import cats.effect.kernel.Async import cats.effect.kernel.Sync import cats.effect.syntax.all._ import cats.syntax.all._ +import fs2.io.file.{Files, Flags, Path, ReadCursor, WriteCursor} import java.io.{InputStream, OutputStream} @@ -151,4 +153,55 @@ package object io extends ioplatform { os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush()))) } + def fileBuffer[F[_]: Concurrent: Files]( + fileResource: Resource[F, Path], + chunkSize: Int + ): Pipe[F, Byte, Stream[F, Byte]] = { + def writeChunks( + writeCursor: WriteCursor[F], + stream: Stream[F, Byte] + ): Pull[F, WriteCursor[F], WriteCursor[F]] = + stream.pull.uncons.flatMap { + case Some((chunk, tail)) => + writeCursor.writePull(chunk).flatMap { writeCursor => + Pull.output1(writeCursor) >> + writeChunks(writeCursor, tail) + } + case None => Pull.pure(writeCursor) + } + + def readChunks( + readCursor: ReadCursor[F], + offsets: Stream[F, Long] + ): Pull[F, Byte, ReadCursor[F]] = + offsets.pull.uncons1.flatMap { + case Some((offset, tail)) => + readCursor.readUntil(chunkSize, offset).flatMap { readCursor => + readChunks(readCursor, tail) + } + case None => Pull.pure(readCursor) + } + + { (stream: Stream[F, Byte]) => + for { + file <- Stream.resource(fileResource) + offsets <- Stream.resource(Files[F].writeCursor(file, Flags.Write)).flatMap { writeCursor => + writeChunks(writeCursor, stream).void.stream + .map(_.offset) + .noneTerminate + .zipWithPrevious + .map[(Long, Boolean)] { + case (_, Some(offset)) => (offset, true) + case (Some(Some(offset)), None) => (offset, false) + case _ => (0L, false) + } + .hold1 + .map(_.discrete.takeThrough(_._2).map(_._1)) + } + } yield Stream.resource(Files[F].readCursor(file, Flags.Read)).flatMap { readCursor => + readChunks(readCursor, offsets).void.stream + } + } + } + } From c23605b8940a68d8b40acf92add527da17c6bc5f Mon Sep 17 00:00:00 2001 From: Pierre Kisters Date: Thu, 16 Nov 2023 15:17:29 +0100 Subject: [PATCH 2/3] simplify --- io/shared/src/main/scala/fs2/io/io.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index aef5fe8617..2e4c1a24ef 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -187,13 +187,10 @@ package object io extends ioplatform { file <- Stream.resource(fileResource) offsets <- Stream.resource(Files[F].writeCursor(file, Flags.Write)).flatMap { writeCursor => writeChunks(writeCursor, stream).void.stream - .map(_.offset) - .noneTerminate - .zipWithPrevious - .map[(Long, Boolean)] { - case (_, Some(offset)) => (offset, true) - case (Some(Some(offset)), None) => (offset, false) - case _ => (0L, false) + .map(e => (e.offset, true)) + .append(Stream.emit((0L, false))) + .scan1 { case ((lastOffset, _), (offset, hasNext)) => + (Math.max(lastOffset, offset), hasNext) } .hold1 .map(_.discrete.takeThrough(_._2).map(_._1)) From a4a6aece1419f89a6cc02a9bb3dad5d71c73740d Mon Sep 17 00:00:00 2001 From: Pierre Kisters Date: Thu, 16 Nov 2023 15:33:55 +0100 Subject: [PATCH 3/3] explicit type for scan1 --- io/shared/src/main/scala/fs2/io/io.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/shared/src/main/scala/fs2/io/io.scala b/io/shared/src/main/scala/fs2/io/io.scala index 2e4c1a24ef..02639dcbce 100644 --- a/io/shared/src/main/scala/fs2/io/io.scala +++ b/io/shared/src/main/scala/fs2/io/io.scala @@ -189,7 +189,7 @@ package object io extends ioplatform { writeChunks(writeCursor, stream).void.stream .map(e => (e.offset, true)) .append(Stream.emit((0L, false))) - .scan1 { case ((lastOffset, _), (offset, hasNext)) => + .scan1[(Long, Boolean)] { case ((lastOffset, _), (offset, hasNext)) => (Math.max(lastOffset, offset), hasNext) } .hold1