Skip to content

Commit 768039d

Browse files
authored
Merge pull request #3383 from mpilquist/topic/walk-performance
Improve performance of Files.walk on the JVM
2 parents ca6e270 + 375942e commit 768039d

File tree

10 files changed

+582
-16
lines changed

10 files changed

+582
-16
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._
22

33
Global / onChangedBuildSource := ReloadOnSourceChanges
44

5-
ThisBuild / tlBaseVersion := "3.9"
5+
ThisBuild / tlBaseVersion := "3.10"
66

77
ThisBuild / organization := "co.fs2"
88
ThisBuild / organizationName := "Functional Streams for Scala"

io/js/src/main/scala/fs2/io/file/FilesPlatform.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ private[fs2] trait FilesCompanionPlatform {
175175
)
176176
).adaptError { case IOException(ex) => ex }
177177
else
178-
walk(path, Int.MaxValue, true).evalTap(deleteIfExists).compile.drain
178+
walk(path, WalkOptions.Default.withFollowLinks(true)).evalTap(deleteIfExists).compile.drain
179179

180180
override def exists(path: Path, followLinks: Boolean): F[Boolean] =
181181
(if (followLinks)

io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala

+140-4
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,21 @@ import cats.effect.kernel.{Async, Resource, Sync}
2727
import cats.syntax.all._
2828

2929
import java.nio.channels.{FileChannel, SeekableByteChannel}
30-
import java.nio.file.{Files => JFiles, Path => JPath, _}
30+
import java.nio.file.{Files => JFiles, Path => JPath, FileSystemLoopException => _, _}
3131
import java.nio.file.attribute.{
3232
BasicFileAttributeView,
3333
BasicFileAttributes => JBasicFileAttributes,
3434
PosixFileAttributes => JPosixFileAttributes,
35-
PosixFilePermissions
35+
PosixFilePermissions,
36+
FileTime
3637
}
3738
import java.security.Principal
3839
import java.util.stream.{Stream => JStream}
3940

4041
import scala.concurrent.duration._
42+
import scala.util.control.NonFatal
4143

4244
import fs2.io.CollectionCompat._
43-
import java.nio.file.attribute.FileTime
4445

4546
private[file] trait FilesPlatform[F[_]] extends DeprecatedFilesApi[F] { self: Files[F] =>
4647

@@ -91,7 +92,8 @@ private[file] trait FilesCompanionPlatform {
9192
private case class NioFileKey(value: AnyRef) extends FileKey
9293

9394
private final class AsyncFiles[F[_]](protected implicit val F: Async[F])
94-
extends Files.UnsealedFiles[F] {
95+
extends Files.UnsealedFiles[F]
96+
with AsyncFilesPlatform[F] {
9597

9698
def copy(source: Path, target: Path, flags: CopyFlags): F[Unit] =
9799
Sync[F].blocking {
@@ -389,6 +391,140 @@ private[file] trait FilesCompanionPlatform {
389391
.resource(Resource.fromAutoCloseable(javaCollection))
390392
.flatMap(ds => Stream.fromBlockingIterator[F](collectionIterator(ds), pathStreamChunkSize))
391393

394+
protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path] = {
395+
val doWalk = Sync[F].interruptible {
396+
val bldr = Vector.newBuilder[Path]
397+
JFiles.walkFileTree(
398+
start.toNioPath,
399+
if (options.followLinks) Set(FileVisitOption.FOLLOW_LINKS).asJava else Set.empty.asJava,
400+
options.maxDepth,
401+
new SimpleFileVisitor[JPath] {
402+
private def enqueue(path: JPath): FileVisitResult = {
403+
bldr += Path.fromNioPath(path)
404+
FileVisitResult.CONTINUE
405+
}
406+
407+
override def visitFile(file: JPath, attrs: JBasicFileAttributes): FileVisitResult =
408+
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(file)
409+
410+
override def visitFileFailed(file: JPath, t: IOException): FileVisitResult =
411+
t match {
412+
case _: FileSystemLoopException =>
413+
if (options.allowCycles) enqueue(file) else throw t
414+
case _ => FileVisitResult.CONTINUE
415+
}
416+
417+
override def preVisitDirectory(
418+
dir: JPath,
419+
attrs: JBasicFileAttributes
420+
): FileVisitResult =
421+
if (Thread.interrupted()) FileVisitResult.TERMINATE else enqueue(dir)
422+
423+
override def postVisitDirectory(dir: JPath, t: IOException): FileVisitResult =
424+
if (Thread.interrupted()) FileVisitResult.TERMINATE else FileVisitResult.CONTINUE
425+
}
426+
)
427+
Chunk.from(bldr.result())
428+
}
429+
Stream.eval(doWalk).flatMap(Stream.chunk)
430+
}
431+
432+
private case class WalkEntry(
433+
path: Path,
434+
attr: JBasicFileAttributes,
435+
depth: Int,
436+
ancestry: List[Either[Path, NioFileKey]]
437+
)
438+
439+
protected def walkJustInTime(
440+
start: Path,
441+
options: WalkOptions
442+
): Stream[F, Path] = {
443+
import scala.collection.immutable.Queue
444+
445+
def loop(toWalk0: Queue[WalkEntry]): Stream[F, Path] = {
446+
val partialWalk = Sync[F].interruptible {
447+
var acc = Vector.empty[Path]
448+
var toWalk = toWalk0
449+
450+
while (acc.size < options.chunkSize && toWalk.nonEmpty && !Thread.interrupted()) {
451+
val entry = toWalk.head
452+
toWalk = toWalk.drop(1)
453+
acc = acc :+ entry.path
454+
if (entry.depth < options.maxDepth) {
455+
val dir =
456+
if (entry.attr.isDirectory) entry.path
457+
else if (options.followLinks && entry.attr.isSymbolicLink) {
458+
try {
459+
val targetAttr =
460+
JFiles.readAttributes(entry.path.toNioPath, classOf[JBasicFileAttributes])
461+
val fileKey = Option(targetAttr.fileKey).map(NioFileKey(_))
462+
val isCycle = entry.ancestry.exists {
463+
case Right(ancestorKey) =>
464+
fileKey.contains(ancestorKey)
465+
case Left(ancestorPath) =>
466+
JFiles.isSameFile(entry.path.toNioPath, ancestorPath.toNioPath)
467+
}
468+
if (isCycle)
469+
if (options.allowCycles) null
470+
else throw new FileSystemLoopException(entry.path.toString)
471+
else entry.path
472+
} catch {
473+
case t: FileSystemLoopException => throw t
474+
case NonFatal(_) => null
475+
}
476+
} else null
477+
if (dir ne null) {
478+
try {
479+
val listing = JFiles.list(dir.toNioPath)
480+
try {
481+
val descendants = listing.iterator.asScala.flatMap { p =>
482+
try
483+
Some(
484+
WalkEntry(
485+
Path.fromNioPath(p),
486+
JFiles.readAttributes(
487+
p,
488+
classOf[JBasicFileAttributes],
489+
LinkOption.NOFOLLOW_LINKS
490+
),
491+
entry.depth + 1,
492+
Option(entry.attr.fileKey)
493+
.map(NioFileKey(_))
494+
.toRight(entry.path) :: entry.ancestry
495+
)
496+
)
497+
catch {
498+
case NonFatal(_) => None
499+
}
500+
}
501+
toWalk = Queue.empty ++ descendants ++ toWalk
502+
} finally listing.close()
503+
} catch {
504+
case NonFatal(_) => ()
505+
}
506+
}
507+
}
508+
}
509+
510+
Stream.chunk(Chunk.from(acc)) ++ (if (toWalk.isEmpty) Stream.empty else loop(toWalk))
511+
}
512+
Stream.eval(partialWalk).flatten
513+
}
514+
515+
Stream
516+
.eval(Sync[F].interruptible {
517+
WalkEntry(
518+
start,
519+
JFiles.readAttributes(start.toNioPath, classOf[JBasicFileAttributes]),
520+
0,
521+
Nil
522+
)
523+
})
524+
.mask
525+
.flatMap(w => loop(Queue(w)))
526+
}
527+
392528
def createWatcher: Resource[F, Watcher[F]] = Watcher.default(this, F)
393529

394530
def watch(

io/jvm-native/src/test/scala/fs2/io/file/BaseFileSuite.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import java.nio.file.{Files => JFiles, Path => JPath, _}
3030
import java.nio.file.attribute.{BasicFileAttributes => JBasicFileAttributes}
3131

3232
import scala.concurrent.duration._
33+
import scala.util.control.NonFatal
3334

3435
trait BaseFileSuite extends Fs2Suite {
3536

@@ -77,11 +78,15 @@ trait BaseFileSuite extends Fs2Suite {
7778
dir.toNioPath,
7879
new SimpleFileVisitor[JPath] {
7980
override def visitFile(path: JPath, attrs: JBasicFileAttributes) = {
80-
JFiles.delete(path)
81+
try JFiles.deleteIfExists(path)
82+
catch { case NonFatal(_) => () }
8183
FileVisitResult.CONTINUE
8284
}
85+
override def visitFileFailed(path: JPath, e: IOException) =
86+
FileVisitResult.CONTINUE
8387
override def postVisitDirectory(path: JPath, e: IOException) = {
84-
JFiles.delete(path)
88+
try JFiles.deleteIfExists(path)
89+
catch { case NonFatal(_) => () }
8590
FileVisitResult.CONTINUE
8691
}
8792
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io
24+
package file
25+
26+
private[file] trait AsyncFilesPlatform[F[_]] { self: Files.UnsealedFiles[F] =>
27+
28+
override def walk(
29+
start: Path,
30+
options: WalkOptions
31+
): Stream[F, Path] =
32+
if (options.chunkSize == Int.MaxValue) walkEager(start, options)
33+
else walkJustInTime(start, options)
34+
35+
protected def walkEager(start: Path, options: WalkOptions): Stream[F, Path]
36+
37+
protected def walkJustInTime(
38+
start: Path,
39+
options: WalkOptions
40+
): Stream[F, Path]
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
package fs2
23+
package io
24+
package file
25+
26+
import cats.effect.IO
27+
import java.io.File
28+
import scala.concurrent.duration.*
29+
30+
class WalkBenchmark extends Fs2IoSuite {
31+
32+
override def munitIOTimeout = 5.minutes
33+
34+
private var target: Path = _
35+
36+
override def beforeAll() = {
37+
super.beforeAll()
38+
val file = File.createTempFile("fs2-benchmarks-", "-walk")
39+
file.delete()
40+
file.mkdir()
41+
target = Path(file.toString)
42+
43+
val MaxDepth = 7
44+
val Names = 'A'.to('E').toList.map(_.toString)
45+
46+
def loop(cwd: File, depth: Int): Unit =
47+
if (depth < MaxDepth) {
48+
Names.foreach { name =>
49+
val sub = new File(cwd, name)
50+
sub.mkdir()
51+
loop(sub, depth + 1)
52+
}
53+
} else if (depth == MaxDepth) {
54+
Names.foreach { name =>
55+
val sub = new File(cwd, name)
56+
sub.createNewFile()
57+
loop(sub, depth + 1)
58+
}
59+
}
60+
61+
loop(file, 0)
62+
}
63+
64+
def time[A](f: => A): FiniteDuration = {
65+
val start = System.nanoTime()
66+
val _ = f
67+
(System.nanoTime() - start).nanos
68+
}
69+
70+
test("Files.walk has similar performance to java.nio.file.Files.walk") {
71+
val fs2Time = time(
72+
Files[IO]
73+
.walk(target)
74+
.compile
75+
.count
76+
.unsafeRunSync()
77+
)
78+
val fs2EagerTime = time(
79+
Files[IO]
80+
.walk(target, WalkOptions.Eager)
81+
.compile
82+
.count
83+
.unsafeRunSync()
84+
)
85+
val nioTime = time(java.nio.file.Files.walk(target.toNioPath).count())
86+
val epsilon = nioTime.toNanos * 1.5
87+
println(s"fs2 took: ${fs2Time.toMillis} ms")
88+
println(s"fs2 eager took: ${fs2EagerTime.toMillis} ms")
89+
println(s"nio took: ${nioTime.toMillis} ms")
90+
assert(
91+
(fs2Time - nioTime).toNanos.abs < epsilon,
92+
s"fs2 time: $fs2Time, nio time: $nioTime, diff: ${fs2Time - nioTime}"
93+
)
94+
}
95+
96+
test("walk is interruptible") {
97+
val elapsed = time(
98+
Files[IO]
99+
.walk(target)
100+
.interruptAfter(1.second)
101+
.compile
102+
.count
103+
.unsafeRunSync()
104+
)
105+
assert(elapsed < 1250.milliseconds)
106+
}
107+
108+
test("walk eager is interruptible") {
109+
val elapsed = time(
110+
Files[IO]
111+
.walk(target, WalkOptions.Eager)
112+
.interruptAfter(1.second)
113+
.compile
114+
.count
115+
.unsafeRunSync()
116+
)
117+
assert(elapsed < 1250.milliseconds)
118+
}
119+
}

0 commit comments

Comments
 (0)