Skip to content

Commit f504097

Browse files
authored
Merge pull request #2768 from armanbilge/topic/channel-queue-sink
Add `Channel#trySend`
2 parents 98e57d3 + 2ca8783 commit f504097

File tree

3 files changed

+51
-8
lines changed

3 files changed

+51
-8
lines changed

build.sbt

+2-1
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
158158
ProblemFilters.exclude[DirectMissingMethodProblem](
159159
"fs2.compression.Compression.gunzip$default$1$"
160160
),
161-
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.ChunkCompanionPlatform.makeArrayBuilder")
161+
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.ChunkCompanionPlatform.makeArrayBuilder"),
162+
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.concurrent.Channel.trySend")
162163
)
163164

164165
lazy val root = tlCrossRootProject

core/shared/src/main/scala/fs2/concurrent/Channel.scala

+36-7
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ sealed trait Channel[F[_], A] {
4545
*/
4646
def send(a: A): F[Either[Channel.Closed, Unit]]
4747

48+
/** Attempts to send an element through this channel, and indicates if
49+
* it succeeded (`true`) or not (`false`).
50+
*
51+
* It can be called concurrently by multiple producers, and it may
52+
* not succeed if the channel is bounded or synchronous. It will
53+
* never semantically block.
54+
*
55+
* No-op if the channel is closed, see [[close]] for further info.
56+
*/
57+
def trySend(a: A): F[Either[Channel.Closed, Boolean]]
58+
4859
/** The stream of elements sent through this channel.
4960
* It terminates if [[close]] is called and all elements in the channel
5061
* have been emitted (see [[close]] for futher info).
@@ -145,33 +156,48 @@ object Channel {
145156
F.uncancelable { poll =>
146157
state.modify {
147158
case s @ State(_, _, _, _, closed @ true) =>
148-
(s, Channel.closed.pure[F])
159+
(s, Channel.closed[Unit].pure[F])
149160

150161
case State(values, size, waiting, producers, closed @ false) =>
151162
if (size < capacity)
152163
(
153164
State(a :: values, size + 1, None, producers, false),
154-
notifyStream(waiting)
165+
notifyStream(waiting).as(rightUnit)
155166
)
156167
else
157168
(
158169
State(values, size, None, (a, producer) :: producers, false),
159-
notifyStream(waiting) <* waitOnBound(producer, poll)
170+
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
160171
)
161172
}.flatten
162173
}
163174
}
164175

176+
def trySend(a: A) =
177+
state.modify {
178+
case s @ State(_, _, _, _, closed @ true) =>
179+
(s, Channel.closed[Boolean].pure[F])
180+
181+
case s @ State(values, size, waiting, producers, closed @ false) =>
182+
if (size < capacity)
183+
(
184+
State(a :: values, size + 1, None, producers, false),
185+
notifyStream(waiting).as(rightTrue)
186+
)
187+
else
188+
(s, rightFalse.pure[F])
189+
}.flatten
190+
165191
def close =
166192
state
167193
.modify {
168194
case s @ State(_, _, _, _, closed @ true) =>
169-
(s, Channel.closed.pure[F])
195+
(s, Channel.closed[Unit].pure[F])
170196

171197
case State(values, size, waiting, producers, closed @ false) =>
172198
(
173199
State(values, size, None, producers, true),
174-
notifyStream(waiting) <* signalClosure
200+
notifyStream(waiting).as(rightUnit) <* signalClosure
175201
)
176202
}
177203
.flatten
@@ -219,7 +245,7 @@ object Channel {
219245
}.flatten
220246

221247
def notifyStream(waitForChanges: Option[Deferred[F, Unit]]) =
222-
waitForChanges.traverse(_.complete(())).as(rightUnit)
248+
waitForChanges.traverse(_.complete(()))
223249

224250
def waitOnBound(producer: Deferred[F, Unit], poll: Poll[F]) =
225251
poll(producer.get).onCancel {
@@ -248,6 +274,9 @@ object Channel {
248274
}
249275

250276
// allocate once
251-
private final val closed: Either[Closed, Unit] = Left(Closed)
277+
@inline private final def closed[A]: Either[Closed, A] = _closed
278+
private[this] final val _closed: Either[Closed, Nothing] = Left(Closed)
252279
private final val rightUnit: Either[Closed, Unit] = Right(())
280+
private final val rightTrue: Either[Closed, Boolean] = Right(true)
281+
private final val rightFalse: Either[Closed, Boolean] = Right(false)
253282
}

core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala

+13
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,19 @@ class ChannelSuite extends Fs2Suite {
9494
p.assertEquals(v)
9595
}
9696

97+
test("trySend does not block") {
98+
val v = Vector(1, 2, 3, 4)
99+
val capacity = 3
100+
val p = for {
101+
chan <- Channel.bounded[IO, Int](capacity)
102+
_ <- v.traverse(chan.trySend)
103+
_ <- chan.close
104+
res <- chan.stream.chunks.take(1).compile.lastOrError
105+
} yield res.toVector
106+
107+
p.assertEquals(v.take(capacity))
108+
}
109+
97110
test("Timely closure") {
98111
val v = Vector(1, 2, 3)
99112
val p = for {

0 commit comments

Comments
 (0)