Skip to content

Commit c6d1146

Browse files
committed
Merge remote-tracking branch 'refs/remotes/origin/issue/3182-jdk-unix-sockets' into issue/3182-jdk-unix-sockets
2 parents 773351b + c381fd8 commit c6d1146

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

core/shared/src/main/scala/fs2/concurrent/Signal.scala

+51-3
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,11 @@ import cats.effect.std.MapRef
2828
import cats.effect.syntax.all._
2929
import cats.syntax.all._
3030
import cats.{Applicative, Functor, Invariant, Monad}
31-
31+
import cats.arrow.FunctionK
3232
import scala.collection.immutable.LongMap
33+
import fs2.concurrent.SignallingRef.TransformedSignallingRef
34+
import fs2.concurrent.Signal.TransformedSignal
35+
import cats.data.State
3336

3437
/** Pure holder of a single value of type `A` that can be read in the effect `F`. */
3538
trait Signal[F[_], A] { outer =>
@@ -135,6 +138,11 @@ trait Signal[F[_], A] { outer =>
135138
*/
136139
def waitUntil(p: A => Boolean)(implicit F: Concurrent[F]): F[Unit] =
137140
discrete.forall(a => !p(a)).compile.drain
141+
142+
def mapK[G[_]](
143+
f: FunctionK[F, G]
144+
): Signal[G, A] =
145+
new TransformedSignal(this, f)
138146
}
139147

140148
object Signal extends SignalInstances {
@@ -162,6 +170,16 @@ object Signal extends SignalInstances {
162170
def get: F[B] = Functor[F].map(fa.get)(f)
163171
}
164172

173+
final private class TransformedSignal[F[_], G[_], A](
174+
underlying: Signal[F, A],
175+
trans: FunctionK[F, G]
176+
) extends Signal[G, A] {
177+
override def get: G[A] = trans(underlying.get)
178+
override def discrete: Stream[G, A] = underlying.discrete.translate(trans)
179+
override def continuous: Stream[G, A] = underlying.continuous.translate(trans)
180+
override def changes(implicit eqA: Eq[A]): Signal[G, A] = underlying.changes.mapK(trans)
181+
}
182+
165183
implicit class SignalOps[F[_], A](val self: Signal[F, A]) extends AnyVal {
166184

167185
/** Converts this signal to signal of `B` by applying `f`.
@@ -196,7 +214,12 @@ object Signal extends SignalInstances {
196214
* function, in the presence of `discrete`, can return `false` and
197215
* need looping even without any other writers.
198216
*/
199-
abstract class SignallingRef[F[_], A] extends Ref[F, A] with Signal[F, A]
217+
abstract class SignallingRef[F[_], A] extends Ref[F, A] with Signal[F, A] {
218+
def mapK[G[_]](
219+
f: FunctionK[F, G]
220+
)(implicit G: Functor[G], dummy: DummyImplicit): SignallingRef[G, A] =
221+
new TransformedSignallingRef(this, f)
222+
}
200223

201224
object SignallingRef {
202225

@@ -222,6 +245,7 @@ object SignallingRef {
222245
*
223246
* @see [[of]]
224247
*/
248+
225249
def apply[F[_]]: PartiallyApplied[F] = new PartiallyApplied[F]
226250

227251
/** Alias for `of`. */
@@ -341,7 +365,31 @@ object SignallingRef {
341365
ref: SignallingRef[F, A]
342366
)(get: A => B, set: A => B => A)(implicit F: Functor[F]): SignallingRef[F, B] =
343367
new LensSignallingRef(ref)(get, set)
344-
368+
final private class TransformedSignallingRef[F[_], G[_], A](
369+
underlying: SignallingRef[F, A],
370+
trans: FunctionK[F, G]
371+
)(implicit G: Functor[G])
372+
extends SignallingRef[G, A] {
373+
374+
// --- Ref methods: these are lifted using trans, just like in TransformedRef2
375+
override def get: G[A] = trans(underlying.get)
376+
override def set(a: A): G[Unit] = trans(underlying.set(a))
377+
override def getAndSet(a: A): G[A] = trans(underlying.getAndSet(a))
378+
override def tryUpdate(f: A => A): G[Boolean] = trans(underlying.tryUpdate(f))
379+
override def tryModify[B](f: A => (A, B)): G[Option[B]] = trans(underlying.tryModify(f))
380+
override def update(f: A => A): G[Unit] = trans(underlying.update(f))
381+
override def modify[B](f: A => (A, B)): G[B] = trans(underlying.modify(f))
382+
override def tryModifyState[B](state: State[A, B]): G[Option[B]] =
383+
trans(underlying.tryModifyState(state))
384+
override def modifyState[B](state: State[A, B]): G[B] = trans(underlying.modifyState(state))
385+
override def access: G[(A, A => G[Boolean])] =
386+
G.compose[(A, *)].compose[A => *].map(trans(underlying.access))(trans(_))
387+
388+
// --- Signal-specific methods
389+
override def discrete: Stream[G, A] = underlying.discrete.translate(trans)
390+
override def continuous: Stream[G, A] = underlying.continuous.translate(trans)
391+
override def changes(implicit eqA: Eq[A]): Signal[G, A] = underlying.changes.mapK(trans)
392+
}
345393
private final class LensSignallingRef[F[_], A, B](underlying: SignallingRef[F, A])(
346394
lensGet: A => B,
347395
lensSet: A => B => A

core/shared/src/test/scala/fs2/concurrent/SignalSuite.scala

+14
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ package concurrent
2525
import cats.effect.IO
2626
import cats.effect.kernel.Ref
2727
import cats.syntax.all._
28+
import cats.arrow.FunctionK
2829
import cats.effect.testkit.TestControl
2930
// import cats.laws.discipline.{ApplicativeTests, FunctorTests}
3031
import scala.concurrent.duration._
@@ -320,6 +321,19 @@ class SignalSuite extends Fs2Suite {
320321
TestControl.executeEmbed(prog).assertEquals(expected)
321322
}
322323

324+
test("SignallingRef#mapK returns a SignallingRef") {
325+
for {
326+
s <- SignallingRef[IO, Int](0)
327+
nt = new FunctionK[IO, IO] {
328+
def apply[A](fa: IO[A]): IO[A] = fa
329+
}
330+
transformed: SignallingRef[IO, Int] = s.mapK(nt)
331+
} yield assert(
332+
transformed.isInstanceOf[SignallingRef[IO, Int]],
333+
s"Expected transformed to be a SignallingRef but got: ${transformed.getClass.getName}"
334+
)
335+
}
336+
323337
// TODO - Port laws tests once we have a compatible version of cats-laws
324338
// /**
325339
// * This is unsafe because the Signal created cannot have multiple consumers

0 commit comments

Comments
 (0)