Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 65 additions & 42 deletions core/shared/src/main/scala/fs2/concurrent/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package fs2
package concurrent

import cats.effect._
import cats.effect.implicits._
import cats.syntax.all._
import scala.collection.immutable.LongMap

Expand Down Expand Up @@ -151,7 +150,7 @@ object Topic {
/** Constructs a Topic */
def apply[F[_], A](implicit F: Concurrent[F]): F[Topic[F, A]] =
(
F.ref(LongMap.empty[Channel[F, A]] -> 1L),
F.ref(State.initial[F, A]),
SignallingRef[F, Int](0),
F.deferred[Unit]
).mapN { case (state, subscriberCount, signalClosure) =>
Expand All @@ -161,11 +160,11 @@ object Topic {
lm.foldLeft(F.unit) { case (op, (_, b)) => op >> f(b) }

def publish1(a: A): F[Either[Topic.Closed, Unit]] =
signalClosure.tryGet.flatMap {
case Some(_) => Topic.closed.pure[F]
case None =>
state.get
.flatMap { case (subs, _) => foreach(subs)(_.send(a).void) }
state.get.flatMap {
case State.Closed() =>
Topic.closed.pure[F]
case State.Active(subs, _) =>
foreach(subs)(_.send(a).void)
.as(Topic.rightUnit)
}

Expand All @@ -180,31 +179,44 @@ object Topic {
.flatMap(subscribeAwaitImpl)

def subscribeAwaitImpl(chan: Channel[F, A]): Resource[F, Stream[F, A]] = {
val subscribe = state.modify { case (subs, id) =>
(subs.updated(id, chan), id + 1) -> id
} <* subscriberCount.update(_ + 1)

def unsubscribe(id: Long) =
state.modify { case (subs, nextId) =>
// _After_ we remove the bounded channel for this
// subscriber, we need to drain it to unblock to
// publish loop which might have already enqueued
// something.
def drainChannel: F[Unit] =
subs.get(id).traverse_ { chan =>
chan.close >> chan.stream.compile.drain
}

(subs - id, nextId) -> drainChannel
}.flatten >> subscriberCount.update(_ - 1)

Resource.eval(signalClosure.tryGet).flatMap {
case Some(_) => Resource.pure(Stream.empty)
case None =>
Copy link
Author

@TomasMikula TomasMikula Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the original race condition: signalClosure.tryGet returns None, but by the time we subscribe the Topic might have been closed and subscription Stream never terminates.

Fixed by making the state Ref able to determine closure. I.e., check and update a single Ref, instead of checking a Deferred and updating a Ref.

Resource
.make(subscribe)(unsubscribe)
.as(chan.stream)
}
val subscribe: F[Option[Long]] =
state.flatModify {
case State.Active(subs, nextId) =>
val newState = State.Active(subs.updated(nextId, chan), nextId + 1)
val action = subscriberCount.update(_ + 1)
val result = Some(nextId)
newState -> action.as(result)
case closed @ State.Closed() =>
closed -> F.pure(None)
}

def unsubscribe(id: Long): F[Unit] =
state.flatModify {
case State.Active(subs, nextId) =>
// _After_ we remove the bounded channel for this
// subscriber, we need to drain it to unblock to
// publish loop which might have already enqueued
// something.
def drainChannel: F[Unit] =
subs.get(id).traverse_ { chan =>
chan.close >> chan.stream.compile.drain
}

State.Active(subs - id, nextId) -> (drainChannel *> subscriberCount.update(_ - 1))

case closed @ State.Closed() =>
closed -> F.unit
}

Resource
.make(subscribe) {
case Some(id) => unsubscribe(id)
case None => F.unit
}
.map {
case Some(_) => chan.stream
case None => Stream.empty
}
}

def publish: Pipe[F, A, Nothing] = { in =>
Expand All @@ -223,22 +235,33 @@ object Topic {
def subscribers: Stream[F, Int] = subscriberCount.discrete

def close: F[Either[Topic.Closed, Unit]] =
signalClosure
.complete(())
.flatMap { completedNow =>
val result = if (completedNow) Topic.rightUnit else Topic.closed

state.get
.flatMap { case (subs, _) => foreach(subs)(_.close.void) }
.as(result)
}
.uncancelable
state.flatModify {
case State.Active(subs, _) =>
val action = foreach(subs)(_.close.void) *> signalClosure.complete(())
(State.Closed(), action.as(Topic.rightUnit))
case closed @ State.Closed() =>
(closed, Topic.closed.pure[F])
}

def closed: F[Unit] = signalClosure.get
def isClosed: F[Boolean] = signalClosure.tryGet.map(_.isDefined)
}
}

private sealed trait State[F[_], A]

private object State {
case class Active[F[_], A](
subscribers: LongMap[Channel[F, A]],
nextId: Long
) extends State[F, A]

case class Closed[F[_], A]() extends State[F, A]

def initial[F[_], A]: State[F, A] =
Active(LongMap.empty, 1L)
}

private final val closed: Either[Closed, Unit] = Left(Closed)
private final val rightUnit: Either[Closed, Unit] = Right(())
}
35 changes: 35 additions & 0 deletions core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,4 +185,39 @@ class TopicSuite extends Fs2Suite {

TestControl.executeEmbed(program) // will fail if program is deadlocked
}

// https://github.com/typelevel/fs2/issues/3642
test("subscribe and close concurrently") {
val check: IO[Unit] =
for {
t <- Topic[IO, Int]
fiber <- t
.subscribe(maxQueued = 1)
.compile
.toList
.start // let the subscription race with closing
_ <- t.close
_ <- fiber.join.timeout(5.seconds) // checking termination of the subscription stream
} yield ()

check.replicateA_(10000)
}

// https://github.com/typelevel/fs2/issues/3642
test("subscribeAwait and close concurrently") {
val check: IO[Unit] =
for {
t <- Topic[IO, Int]
fiber <- Stream
.resource(t.subscribeAwait(maxQueued = 1))
.flatten
.compile
.toList
.start // let the subscription race with closing
_ <- t.close
_ <- fiber.join.timeout(5.seconds) // checking termination of the subscription stream
} yield ()

check.replicateA_(10000)
}
}