Skip to content

Commit 23f8499

Browse files
authored
Merge pull request #2684 from eugeniasimich/spaced
Added `spaced` function, similar to `metered` but waiting
2 parents 6163cc4 + 3409650 commit 23f8499

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed

core/shared/src/main/scala/fs2/Stream.scala

+24
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,30 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
675675
def meteredStartImmediately[F2[x] >: F[x]: Temporal](rate: FiniteDuration): Stream[F2, O] =
676676
(Stream.emit(()) ++ Stream.fixedRate[F2](rate)).zipRight(this)
677677

678+
/** Waits the specified `delay` between each event.
679+
*
680+
* The resulting stream emits the same elements from `this` stream,
681+
* but split into singleton chunks. Between each chunk (element) it
682+
* adds a pause of a fixed `delay` duration.
683+
*
684+
* This method differs in the timing of elements from [[metered]].
685+
* The [[metered]] combinator takes a "schedule" for elements to be released,
686+
* and before each element introduces just the necessary delay to hit that time.
687+
* To do so, it deducts from the pause any delay caused by other effects
688+
* in the stream, or the pauses the stream consumer takes while pulling.
689+
* This method, instead, simply introduced a fixed sleep time between elements,
690+
* irrespective of other pauses in the stream or the consumer.
691+
*
692+
* Starts immediately, same as [[meteredStartImmediately]]
693+
* unless parameter `startImmediately` is set to false.
694+
*/
695+
def spaced[F2[x] >: F[x]: Temporal](
696+
delay: FiniteDuration,
697+
startImmediately: Boolean = true
698+
): Stream[F2, O] =
699+
((if (startImmediately) Stream.emit(()) else Stream.empty) ++ Stream.fixedDelay[F2](delay))
700+
.zipRight(this)
701+
678702
/** Logs the elements of this stream as they are pulled.
679703
*
680704
* By default, `toString` is called on each element and the result is printed

core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala

+44
Original file line numberDiff line numberDiff line change
@@ -1056,6 +1056,50 @@ class StreamCombinatorsSuite extends Fs2Suite {
10561056
.map(results => assert(results.size == 1))
10571057
}
10581058

1059+
test("spaced should start immediately if startImmediately is not set") {
1060+
Stream
1061+
.emit[IO, Int](1)
1062+
.repeatN(10)
1063+
.spaced(1.second)
1064+
.interruptAfter(500.milliseconds)
1065+
.compile
1066+
.toList
1067+
.map(results => assert(results.size == 1))
1068+
}
1069+
1070+
test("spaced should not start immediately if startImmediately is set to false") {
1071+
Stream
1072+
.emit[IO, Int](1)
1073+
.repeatN(10)
1074+
.spaced(1.second, startImmediately = false)
1075+
.interruptAfter(500.milliseconds)
1076+
.compile
1077+
.toList
1078+
.map(results => assert(results.isEmpty))
1079+
}
1080+
1081+
test("metered should not wait between events that last longer than the rate") {
1082+
Stream
1083+
.eval[IO, Int](IO.sleep(1.second).as(1))
1084+
.repeatN(10)
1085+
.metered(1.second)
1086+
.interruptAfter(4500.milliseconds)
1087+
.compile
1088+
.toList
1089+
.map(results => assert(results.size == 3))
1090+
}
1091+
1092+
test("spaced should wait between events") {
1093+
Stream
1094+
.eval[IO, Int](IO.sleep(1.second).as(1))
1095+
.repeatN(10)
1096+
.spaced(1.second)
1097+
.interruptAfter(4500.milliseconds)
1098+
.compile
1099+
.toList
1100+
.map(results => assert(results.size == 2))
1101+
}
1102+
10591103
test("mapAsyncUnordered") {
10601104
forAllF { (s: Stream[Pure, Int]) =>
10611105
val f = (_: Int) + 1

0 commit comments

Comments
 (0)