Skip to content

Commit e3cb49a

Browse files
Fixed ChainedFiniteStream/Signal
1 parent db136d5 commit e3cb49a

3 files changed

Lines changed: 26 additions & 36 deletions

File tree

src/main/scala/io/github/makingthematrix/signals3/Finite.scala

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ object Finite {
5252
first.subscribe(this)
5353
first.onClose {
5454
second.subscribe(this)
55-
first.unsubscribe(this)
5655
}
5756
} else {
5857
second.subscribe(this)
@@ -79,7 +78,6 @@ object Finite {
7978
first.subscribe(this)
8079
first.onClose {
8180
second.subscribe(this)
82-
first.unsubscribe(this)
8381
}
8482
}
8583
else {
@@ -96,18 +94,19 @@ object Finite {
9694

9795
@static final private[signals3] class ChainedFiniteSignal[V, W <: V](first: FiniteSignal[V], second: => FiniteSignal[W])
9896
extends Signal[V] with Finite[V] with SignalSubscriber {
97+
9998
override protected[signals3] def onWire(): Unit =
10099
if (!first.isClosed) {
101100
first.subscribe(this)
102101
first.onClose {
103102
second.subscribe(this)
104-
first.unsubscribe(this)
103+
lastPromise.foreach(_.completeWith(second.last))
104+
second.onClose { close() }
105105
}
106106
} else if (!second.isClosed) {
107107
second.subscribe(this)
108-
second.onClose {
109-
second.subscribe(this)
110-
}
108+
lastPromise.foreach(_.completeWith(second.last))
109+
second.onClose {close()}
111110
}
112111

113112
override protected[signals3] def onUnwire(): Unit =
@@ -120,13 +119,8 @@ object Finite {
120119
else current
121120
}
122121

123-
override protected[signals3] def changed(currentContext: Option[ExecutionContext]): Unit = {
124-
update(computeValue, currentContext)
125-
if (first.isClosed && second.isClosed && !isClosed) {
126-
close()
127-
lastPromise.foreach(_.completeWith(second.last))
128-
}
129-
}
122+
override protected[signals3] def changed(currentContext: Option[ExecutionContext]): Unit =
123+
if (!isClosed) update(computeValue, currentContext)
130124
}
131125

132126
@static final private[signals3] class ChainedFiniteStream[E](first: FiniteStream[E], second: => FiniteStream[E])
@@ -136,25 +130,20 @@ object Finite {
136130
first.subscribe(this)
137131
first.onClose {
138132
second.subscribe(this)
139-
first.unsubscribe(this)
133+
lastPromise.foreach(_.completeWith(second.last))
134+
second.onClose {close()}
140135
}
141136
} else if (!second.isClosed) {
142137
second.subscribe(this)
143-
second.onClose {
144-
second.subscribe(this)
145-
}
138+
lastPromise.foreach(_.completeWith(second.last))
139+
second.onClose {close()}
146140
}
147141

148142
override protected[signals3] def onUnwire(): Unit =
149143
if (!first.isClosed) first.unsubscribe(this)
150144
else if (!second.isClosed) second.unsubscribe(this)
151145

152-
override protected[signals3] def onEvent(event: E, currentContext: Option[ExecutionContext]): Unit = if (!isClosed) {
153-
dispatch(event, currentContext)
154-
if (first.isClosed && second.isClosed && !isClosed) {
155-
close()
156-
lastPromise.foreach(_.completeWith(second.last))
157-
}
158-
}
146+
override protected[signals3] def onEvent(event: E, currentContext: Option[ExecutionContext]): Unit =
147+
if (!isClosed) dispatch(event, currentContext)
159148
}
160149
}

src/main/scala/io/github/makingthematrix/signals3/TakeStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import scala.util.{Failure, Success}
1717
* @param take The number of events to take.
1818
* @tparam E The type of the events emitted by the stream.
1919
*/
20-
class TakeStream[E](source: Stream[E], take: Int)
20+
final class TakeStream[E](source: Stream[E], take: Int)
2121
extends IndexedStream[E](source) with Finite[E] {
2222

2323
override def isClosed: Boolean = super.isClosed || counter >= take

src/test/scala/io/github/makingthematrix/signals3/FiniteSpec.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,11 @@ class FiniteSpec extends munit.FunSuite {
110110
}
111111

112112
test("Chain finite signals with >>> operator") {
113-
val a: SourceStream[Int] = Stream()
114-
val b: FiniteStream[Int] = a.take(3)
115-
val c: SourceStream[Int] = Stream()
116-
val d: FiniteStream[Int] = c.take(2)
117-
val e: FiniteStream[Int] = b >>> d
113+
val a: SourceSignal[Int] = Signal()
114+
val b: FiniteSignal[Int] = a.take(3)
115+
val c: SourceSignal[Int] = Signal()
116+
val d: FiniteSignal[Int] = c.take(2)
117+
val e: FiniteSignal[Int] = b >>> d
118118

119119
var lastValue = 0
120120
e.last.foreach(lastValue = _)
@@ -132,17 +132,18 @@ class FiniteSpec extends munit.FunSuite {
132132
assertEquals(res.reverse, List(1, 2))
133133

134134
a ! 3
135-
waitFor(d, 3)
135+
waitFor(e, 3)
136136
assert(b.isClosed)
137-
assertEquals(res.reverse, List(1, 2, 3))
137+
assertEquals(res.reverse, List(1, 2, 3, 20))
138138

139139
a ! 4
140140
c ! 30
141-
c ! 40
142-
awaitAllTasks
143-
assertEquals(res.reverse, Seq(1, 2, 3, 30, 40))
141+
waitFor(e, 30)
144142
assert(d.isClosed)
145143
assert(e.isClosed)
146-
assertEquals(lastValue, 40)
144+
c ! 40
145+
awaitAllTasks
146+
assertEquals(res.reverse, Seq(1, 2, 3, 20, 30))
147+
assertEquals(lastValue, 30)
147148
}
148149
}

0 commit comments

Comments
 (0)