Skip to content

Commit 78178fc

Browse files
committed
Handle error propagation / cancelation in Topic
1 parent 09f86e3 commit 78178fc

File tree

3 files changed

+41
-5
lines changed

3 files changed

+41
-5
lines changed

core/shared/src/main/scala/fs2/concurrent/Channel.scala

+18
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ sealed trait Channel[F[_], A] {
117117
*/
118118
def closeWithElement(a: A): F[Either[Channel.Closed, Unit]]
119119

120+
/** Raises an error, closing the channel with an error state.
121+
*
122+
* No-op if the channel is closed, see [[close]] for further info.
123+
*/
124+
def raiseError(e: Throwable): F[Either[Channel.Closed, Unit]]
125+
126+
/** Cancels the channel, closing it with a canceled state.
127+
*
128+
* No-op if the channel is closed, see [[close]] for further info.
129+
*/
130+
def cancel: F[Either[Channel.Closed, Unit]]
131+
120132
/** Returns true if this channel is closed */
121133
def isClosed: F[Boolean]
122134

@@ -216,6 +228,12 @@ object Channel {
216228
)
217229
}
218230

231+
def raiseError(e: Throwable): F[Either[Closed, Unit]] =
232+
closeWithExitCase(ExitCase.Errored(e))
233+
234+
def cancel: F[Either[Closed, Unit]] =
235+
closeWithExitCase(ExitCase.Canceled)
236+
219237
def isClosed = closedGate.tryGet.map(_.isDefined)
220238

221239
def closed = closedGate.get

core/shared/src/main/scala/fs2/concurrent/Topic.scala

+15-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package fs2
2323
package concurrent
2424

2525
import cats.effect._
26+
import cats.effect.Resource.ExitCase
2627
import cats.effect.implicits._
2728
import cats.syntax.all._
2829
import scala.collection.immutable.LongMap
@@ -208,7 +209,8 @@ object Topic {
208209
}
209210

210211
def publish: Pipe[F, A, Nothing] = { in =>
211-
in.onFinalize(close.void)
212+
in
213+
.onFinalizeCase(closeWithExitCase(_).void)
212214
.evalMap(publish1)
213215
.takeWhile(_.isRight)
214216
.drain
@@ -223,13 +225,24 @@ object Topic {
223225
def subscribers: Stream[F, Int] = subscriberCount.discrete
224226

225227
def close: F[Either[Topic.Closed, Unit]] =
228+
closeWithExitCase(ExitCase.Succeeded)
229+
230+
def closeWithExitCase(exitCase: ExitCase): F[Either[Closed, Unit]] =
226231
signalClosure
227232
.complete(())
228233
.flatMap { completedNow =>
229234
val result = if (completedNow) Topic.rightUnit else Topic.closed
230235

231236
state.get
232-
.flatMap { case (subs, _) => foreach(subs)(_.close.void) }
237+
.flatMap { case (subs, _) =>
238+
foreach(subs)(channel =>
239+
exitCase match {
240+
case ExitCase.Succeeded => channel.close.void
241+
case ExitCase.Errored(e) => channel.raiseError(e).void
242+
case ExitCase.Canceled => channel.cancel.void
243+
}
244+
)
245+
}
233246
.as(result)
234247
}
235248
.uncancelable

core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala

+8-3
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
package fs2
2323
package concurrent
2424

25-
import cats.syntax.all._
25+
import cats.syntax.all.*
2626
import cats.effect.IO
27-
import scala.concurrent.duration._
27+
28+
import scala.concurrent.duration.*
2829
import cats.effect.testkit.TestControl
2930

31+
import scala.concurrent.CancellationException
32+
3033
class TopicSuite extends Fs2Suite {
3134
test("subscribers see all elements published") {
3235
Topic[IO, Int].flatMap { topic =>
@@ -204,6 +207,8 @@ class TopicSuite extends Fs2Suite {
204207
.drain
205208
}
206209

207-
TestControl.executeEmbed(program) // will fail if program is deadlocked
210+
TestControl
211+
.executeEmbed(program)
212+
.intercept[CancellationException]
208213
}
209214
}

0 commit comments

Comments
 (0)