Skip to content

Commit ccae9c7

Browse files
authored
Merge pull request #4059 from biochimia/timeout-returns-effect-outcome
Make `timeout`/`timeoutTo` always return the outcome of the effect
2 parents 8c305d7 + 718c78e commit ccae9c7

File tree

4 files changed

+177
-56
lines changed

4 files changed

+177
-56
lines changed

core/shared/src/main/scala/cats/effect/IO.scala

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -786,17 +786,27 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
786786
andWait(duration: Duration)
787787

788788
/**
789-
* Returns an IO that either completes with the result of the source within the specified time
790-
* `duration` or otherwise raises a `TimeoutException`.
789+
* Returns an IO that either completes with the result of the source or otherwise raises a
790+
* `TimeoutException`.
791791
*
792-
* The source is canceled in the event that it takes longer than the specified time duration
793-
* to complete. Once the source has been successfully canceled (and has completed its
794-
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
795-
* resulting effect will wait for it to complete before raising the exception.
792+
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
793+
* source doesn't complete within the specified time. The resulting effect will always wait
794+
* for the source effect to complete (and to complete its finalizers), and will return the
795+
* source's outcome over raising a `TimeoutException`.
796+
*
797+
* In case source and timeout complete simultaneously, the result of the source will be
798+
* returned over raising a `TimeoutException`.
799+
*
800+
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
796801
*
797802
* @param duration
798-
* is the time span for which we wait for the source to complete; in the event that the
799-
* specified time has passed without the source completing, a `TimeoutException` is raised
803+
* is the time span for which we wait for the source to complete before triggering its
804+
* cancelation; in the event that the specified time has passed without the source
805+
* completing, a `TimeoutException` is raised
806+
*
807+
* @see
808+
* [[timeoutAndForget]] for a variant which does not wait for cancelation of the source
809+
* effect to complete.
800810
*/
801811
def timeout[A2 >: A](duration: Duration): IO[A2] =
802812
handleDuration(duration, this) { finiteDuration =>
@@ -809,26 +819,35 @@ sealed abstract class IO[+A] private () extends IOPlatform[A] {
809819
timeout(duration: Duration)
810820

811821
/**
812-
* Returns an IO that either completes with the result of the source within the specified time
813-
* `duration` or otherwise evaluates the `fallback`.
822+
* Returns an IO that either completes with the result of the source or otherwise evaluates
823+
* the `fallback`.
814824
*
815-
* The source is canceled in the event that it takes longer than the specified time duration
816-
* to complete. Once the source has been successfully canceled (and has completed its
817-
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
818-
* effect will wait for it to complete before evaluating the fallback.
825+
* The source is raised against the timeout `duration`, and its cancelation is triggered if
826+
* the source doesn't complete within the specified time. The resulting effect will always
827+
* wait for the source effect to complete (and to complete its finalizers), and will return
828+
* the source's outcome over sequencing the `fallback`.
829+
*
830+
* In case source and timeout complete simultaneously, the result of the source will be
831+
* returned over sequencing the `fallback`.
832+
*
833+
* If the source in uncancelable, `fallback` will never be evaluated.
819834
*
820835
* @param duration
821-
* is the time span for which we wait for the source to complete; in the event that the
822-
* specified time has passed without the source completing, the `fallback` gets evaluated
836+
* is the time span for which we wait for the source to complete before triggering its
837+
* cancelation; in the event that the specified time has passed without the source
838+
* completing, the `fallback` gets evaluated
823839
*
824840
* @param fallback
825841
* is the task evaluated after the duration has passed and the source canceled
826842
*/
827843
def timeoutTo[A2 >: A](duration: Duration, fallback: IO[A2]): IO[A2] = {
828844
handleDuration[IO[A2]](duration, this) { finiteDuration =>
829-
race(IO.sleep(finiteDuration)).flatMap {
830-
case Right(_) => fallback
831-
case Left(value) => IO.pure(value)
845+
IO.uncancelable { poll =>
846+
poll(racePair(IO.sleep(finiteDuration))) flatMap {
847+
case Left((oc, f)) => f.cancel *> oc.embed(poll(IO.canceled) *> IO.never)
848+
case Right((f, _)) =>
849+
f.cancel *> f.join.flatMap { oc => oc.fold(fallback, IO.raiseError, identity) }
850+
}
832851
}
833852
}
834853
}

kernel/shared/src/main/scala/cats/effect/kernel/GenTemporal.scala

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,23 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
7272
productL(fa)(sleep(time))
7373

7474
/**
75-
* Returns an effect that either completes with the result of the source within the specified
76-
* time `duration` or otherwise evaluates the `fallback`.
75+
* Returns an effect that either completes with the result of the source or otherwise
76+
* evaluates the `fallback`.
7777
*
78-
* The source is canceled in the event that it takes longer than the specified time duration
79-
* to complete. Once the source has been successfully canceled (and has completed its
80-
* finalizers), the fallback will be sequenced. If the source is uncancelable, the resulting
81-
* effect will wait for it to complete before evaluating the fallback.
78+
* The source is raised against the timeout `duration`, and its cancelation is triggered if
79+
* the source doesn't complete within the specified time. The resulting effect will always
80+
* wait for the source effect to complete (and to complete its finalizers), and will return
81+
* the source's outcome over sequencing the `fallback`.
82+
*
83+
* In case source and timeout complete simultaneously, the result of the source will be
84+
* returned over sequencing the `fallback`.
85+
*
86+
* If the source in uncancelable, `fallback` will never be evaluated.
8287
*
8388
* @param duration
84-
* The time span for which we wait for the source to complete; in the event that the
85-
* specified time has passed without the source completing, the `fallback` gets evaluated
89+
* The time span for which we wait for the source to complete before triggering its
90+
* cancelation; in the event that the specified time has passed without the source
91+
* completing, the `fallback` gets evaluated
8692
*
8793
* @param fallback
8894
* The task evaluated after the duration has passed and the source canceled
@@ -91,33 +97,53 @@ trait GenTemporal[F[_], E] extends GenConcurrent[F, E] with Clock[F] {
9197
handleDuration(duration, fa)(timeoutTo(fa, _, fallback))
9298

9399
protected def timeoutTo[A](fa: F[A], duration: FiniteDuration, fallback: F[A]): F[A] =
94-
flatMap(race(fa, sleep(duration))) {
95-
case Left(a) => pure(a)
96-
case Right(_) => fallback
100+
uncancelable { poll =>
101+
implicit val F: GenTemporal[F, E] = this
102+
103+
poll(racePair(fa, sleep(duration))) flatMap {
104+
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
105+
case Right((f, _)) => f.cancel *> f.join.flatMap { oc => oc.embed(fallback) }
106+
}
97107
}
98108

99109
/**
100-
* Returns an effect that either completes with the result of the source within the specified
101-
* time `duration` or otherwise raises a `TimeoutException`.
110+
* Returns an effect that either completes with the result of the source or raises a
111+
* `TimeoutException`.
102112
*
103-
* The source is canceled in the event that it takes longer than the specified time duration
104-
* to complete. Once the source has been successfully canceled (and has completed its
105-
* finalizers), the `TimeoutException` will be raised. If the source is uncancelable, the
106-
* resulting effect will wait for it to complete before raising the exception.
113+
* The source is raced against the timeout `duration`, and its cancelation is triggered if the
114+
* source doesn't complete within the specified time. The resulting effect will always wait
115+
* for the source effect to complete (and to complete its finalizers), and will return the
116+
* source's outcome over raising a `TimeoutException`.
117+
*
118+
* In case source and timeout complete simultaneously, the result of the source will be
119+
* returned over raising a `TimeoutException`.
120+
*
121+
* If the source effect is uncancelable, a `TimeoutException` will never be raised.
107122
*
108123
* @param duration
109-
* The time span for which we wait for the source to complete; in the event that the
110-
* specified time has passed without the source completing, a `TimeoutException` is raised
124+
* The time span for which we wait for the source to complete before triggering its
125+
* cancelation; in the event that the specified time has passed without the source
126+
* completing, a `TimeoutException` is raised
127+
* @see
128+
* [[timeoutAndForget[A](fa:F[A],duration:scala\.concurrent\.duration\.Duration)* timeoutAndForget]]
129+
* for a variant which does not wait for cancelation of the source effect to complete.
111130
*/
112131
def timeout[A](fa: F[A], duration: Duration)(implicit ev: TimeoutException <:< E): F[A] = {
113132
handleDuration(duration, fa)(timeout(fa, _))
114133
}
115134

116135
protected def timeout[A](fa: F[A], duration: FiniteDuration)(
117136
implicit ev: TimeoutException <:< E): F[A] = {
118-
flatMap(race(fa, sleep(duration))) {
119-
case Left(a) => pure(a)
120-
case Right(_) => raiseError[A](ev(new TimeoutException(duration.toString())))
137+
uncancelable { poll =>
138+
implicit val F: GenTemporal[F, E] = this
139+
140+
poll(racePair(fa, sleep(duration))) flatMap {
141+
case Left((oc, f)) => f.cancel *> oc.embed(poll(F.canceled) *> F.never)
142+
case Right((f, _)) =>
143+
f.cancel *> f.join.flatMap { oc =>
144+
oc.embed(raiseError[A](ev(new TimeoutException(duration.toString()))))
145+
}
146+
}
121147
}
122148
}
123149

laws/shared/src/test/scala/cats/effect/laws/GenTemporalSpec.scala

Lines changed: 74 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ package cats
1818
package effect
1919
package laws
2020

21-
import cats.effect.kernel.Temporal
21+
import cats.effect.kernel.{Outcome, Temporal}
2222
import cats.effect.kernel.testkit.TimeT
2323
import cats.effect.kernel.testkit.pure._
2424
import cats.syntax.all._
2525

2626
import org.specs2.mutable.Specification
2727

28+
import scala.concurrent.TimeoutException
2829
import scala.concurrent.duration._
29-
// import scala.concurrent.TimeoutException
3030

3131
class GenTemporalSpec extends Specification { outer =>
3232

@@ -43,6 +43,40 @@ class GenTemporalSpec extends Specification { outer =>
4343
val fa = F.pure(true)
4444
F.timeout(fa, Duration.Inf) mustEqual fa
4545
}
46+
47+
"succeed on a fast action" in {
48+
val fa: TimeT[F, Boolean] = F.pure(true)
49+
val op = F.timeout(fa, Duration.Zero)
50+
51+
run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
52+
}
53+
54+
"error out on a slow action" in {
55+
val fa: TimeT[F, Boolean] = F.never *> F.pure(true)
56+
val op = F.timeout(fa, Duration.Zero)
57+
58+
run(TimeT.run(op)) must beLike {
59+
case Outcome.Errored(e) => e must haveClass[TimeoutException]
60+
}
61+
}
62+
63+
"propagate successful outcome of uncancelable action" in {
64+
val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true))
65+
val op = F.timeout(fa, Duration.Zero)
66+
67+
run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
68+
}
69+
70+
"propagate errors from uncancelable action" in {
71+
val fa = F.uncancelable { _ =>
72+
F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true)
73+
}
74+
val op = F.timeout(fa, Duration.Zero)
75+
76+
run(TimeT.run(op)) must beLike {
77+
case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed"
78+
}
79+
}
4680
}
4781

4882
"timeoutTo" should {
@@ -51,6 +85,44 @@ class GenTemporalSpec extends Specification { outer =>
5185
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
5286
F.timeoutTo(fa, Duration.Inf, fallback) mustEqual fa
5387
}
88+
89+
"succeed on a fast action" in {
90+
val fa: TimeT[F, Boolean] = F.pure(true)
91+
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
92+
val op = F.timeoutTo(fa, Duration.Zero, fallback)
93+
94+
run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
95+
}
96+
97+
"error out on a slow action" in {
98+
val fa: TimeT[F, Boolean] = F.never *> F.pure(true)
99+
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
100+
val op = F.timeoutTo(fa, Duration.Zero, fallback)
101+
102+
run(TimeT.run(op)) must beLike {
103+
case Outcome.Errored(e) => e must haveClass[RuntimeException]
104+
}
105+
}
106+
107+
"propagate successful outcome of uncancelable action" in {
108+
val fa = F.uncancelable(_ => F.sleep(50.millis) *> F.pure(true))
109+
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
110+
val op = F.timeoutTo(fa, Duration.Zero, fallback)
111+
112+
run(TimeT.run(op)) mustEqual Outcome.Succeeded(Some(true))
113+
}
114+
115+
"propagate errors from uncancelable action" in {
116+
val fa = F.uncancelable { _ =>
117+
F.sleep(50.millis) *> F.raiseError(new RuntimeException("fa failed")) *> F.pure(true)
118+
}
119+
val fallback: TimeT[F, Boolean] = F.raiseError(new RuntimeException)
120+
val op = F.timeoutTo(fa, Duration.Zero, fallback)
121+
122+
run(TimeT.run(op)) must beLike {
123+
case Outcome.Errored(e: RuntimeException) => e.getMessage mustEqual "fa failed"
124+
}
125+
}
54126
}
55127

56128
"timeoutAndForget" should {
@@ -63,13 +135,6 @@ class GenTemporalSpec extends Specification { outer =>
63135

64136
// TODO enable these tests once Temporal for TimeT is fixed
65137
/*"temporal" should {
66-
"timeout" should {
67-
"succeed" in {
68-
val op = F.timeout(F.pure(true), 10.seconds)
69-
70-
run(TimeT.run(op)) mustEqual Succeeded(Some(true))
71-
}.pendingUntilFixed
72-
73138
"cancel a loop" in {
74139
val op: TimeT[F, Either[Throwable, Unit]] = F.timeout(loop, 5.millis).attempt
75140
@@ -80,12 +145,6 @@ class GenTemporalSpec extends Specification { outer =>
80145
}
81146
82147
"timeoutTo" should {
83-
"succeed" in {
84-
val op: TimeT[F, Boolean] = F.timeoutTo(F.pure(true), 5.millis, F.raiseError(new RuntimeException))
85-
86-
run(TimeT.run(op)) mustEqual Succeeded(Some(true))
87-
}.pendingUntilFixed
88-
89148
"use fallback" in {
90149
val op: TimeT[F, Boolean] = F.timeoutTo(loop >> F.pure(false), 5.millis, F.pure(true))
91150

tests/shared/src/test/scala/cats/effect/IOSpec.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1876,6 +1876,23 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
18761876
"non-terminate on an uncancelable fiber" in ticked { implicit ticker =>
18771877
IO.never.uncancelable.timeout(1.second) must nonTerminate
18781878
}
1879+
1880+
"propagate successful result from a completed effect" in real {
1881+
IO.pure(true).delayBy(50.millis).uncancelable.timeout(10.millis).map { res =>
1882+
res must beTrue
1883+
}
1884+
}
1885+
1886+
"propagate error from a completed effect" in real {
1887+
IO.raiseError(new RuntimeException)
1888+
.delayBy(50.millis)
1889+
.uncancelable
1890+
.timeout(10.millis)
1891+
.attempt
1892+
.map { res =>
1893+
res must beLike { case Left(e) => e must haveClass[RuntimeException] }
1894+
}
1895+
}
18791896
}
18801897

18811898
"timeoutTo" should {

0 commit comments

Comments
 (0)