Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0fa166f
FallbackStrategy (initial commit)
makingthematrix Apr 13, 2026
a04c109
FallbackStrategy in SourceStream
makingthematrix Apr 13, 2026
93a6e5e
FallbackStrategy handled in collect and filter -- tests needed
makingthematrix Apr 14, 2026
a19043d
Remove CLOSE from strategies
makingthematrix Apr 20, 2026
6f28624
merging two versions of dispatch and publish
makingthematrix Apr 20, 2026
72a8d59
more unit tests for FallbackStrategy
makingthematrix Apr 20, 2026
94aef97
removing UseDefault fallback strategy
makingthematrix Apr 24, 2026
3bef219
a few more changes to proxy streams
makingthematrix Apr 24, 2026
45a9768
Simple .ignoreExceptions and .rethrowExceptions methods for fallback …
makingthematrix Apr 24, 2026
c3fbeae
Switch between Ignore and Rethrow fallback strategies for custom retr…
makingthematrix Apr 26, 2026
61622f7
.recover and .recoverWith
makingthematrix May 3, 2026
9fb83de
Unit tests for recover
makingthematrix May 3, 2026
64d27be
Removing the old fallback strategies
makingthematrix May 3, 2026
0a5cbd7
Add AGENTS.md
makingthematrix May 3, 2026
8b365d9
Stream.withDefault
makingthematrix May 3, 2026
7545a96
.recover and friends in SourceStream now return SourceStream
makingthematrix May 8, 2026
27deb02
recover and friends for Signal
makingthematrix May 15, 2026
5fced72
Fixed unit tests for RecoverSignalSpec
makingthematrix May 15, 2026
18ba063
.recover and friends for SourceSignal
makingthematrix May 16, 2026
e87b732
New Transformers methods + test + debugging DropSignal
makingthematrix May 17, 2026
8315aff
Update documentation
makingthematrix May 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@
*.svg
*.bloop*
*.metals*
*.bsp
*.bsp
.ai
36 changes: 36 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# AGENTS.md

## Project Overview
This is a Scala project implementing a lightweight event streaming library.

## Repository Structure
- `src/main/scala/io/github/makingthematrix/signals3/`: Core implementation
- `src/test/scala/io/github/makingthematrix/signals3/`: Unit tests

## Development Workflow
1. Clone the repository: `git clone https://github.com/makingthematrix/signals3.git`.
2. Run tests: `sbt test`.
3. Run the project: `sbt run`.

## Coding Standards
- Use Scala 3 with braces syntax.

## GitHub Workflow
- Create a new branch for each feature or bug fix.
- Use `feature/xxx` for new features.
- Use `bugfix/xxx` for bug fixes.
- Commit changes with `git commit -am "[MESSAGE]"`
- Push changes to the branch with `git push`
- Don't merge the branch to the main branch directly.

## AI-Specific Guidelines
- Read Scaladoc comments attached to every class and method.
- Always run `sbt test` before committing.
- Do not modify files in `target/`.
- Prefer functional programming patterns.
- Avoid modifying the `build.sbt` file unless necessary.
- Use `sbt` for all build tasks.
- Some unit tests deal with concurrency and can be flaky. If a unit test that is not directly related to the feature you work on fails, rerun it (but only once).

## Contacts
- Maintainer: [@makingthematrix](https://github.com/makingthematrix)
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ lazy val root = (project in file("."))
name := "signals3",
libraryDependencies ++= Seq(
//Test dependencies
"org.scalameta" %% "munit" % "1.2.2" % "test"
"org.scalameta" %% "munit" % "1.3.0" % "test"
),
scalacOptions ++= standardOptions ++ scala3Options
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ package io.github.makingthematrix.signals3
* they're no longer needed, but you can make any new stream or signal inherit [[Closeable]] and implement the required
* logic.
* [[Closeable]] extends [[java.lang.AutoCloseable]] so in theory it can be used in Java `try-with-resources`.
*
*
*
* @see [[ProxyStream]] and [[ProxySignal]] for examples.
*/
trait Closeable extends java.lang.AutoCloseable with CanBeClosed {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.github.makingthematrix.signals3

import io.github.makingthematrix.signals3.Stream.EventSubscriber
import scala.concurrent.ExecutionContext
import scala.util.chaining.scalaUtilChainingOps

protected[signals3] final class FlatMapStream[E, V](source: Stream[E], f: E => Stream[V])
extends Stream[V] with EventSubscriber[E]{
@volatile private var mapped: Option[Stream[V]] = None

private val subscriber = new EventSubscriber[V]{
override protected[signals3] def onEvent(event: V, currentContext: Option[ExecutionContext]): Unit =
dispatch(event, currentContext)
}

override protected[signals3] def onEvent(event: E, currentContext: Option[ExecutionContext]): Unit = {
mapped.foreach(_.unsubscribe(subscriber))
mapped = Some(f(event).tap(_.subscribe(subscriber)))
}

override protected def onWire(): Unit = source.subscribe(this)

override protected def onUnwire(): Unit = {
mapped.foreach(_.unsubscribe(subscriber))
mapped = None
source.unsubscribe(this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,29 @@ abstract private[signals3] class ProxySignal[V](sources: Signal[?]*) extends Sig
}

private[signals3] object ProxySignal {
final class ScanSignal[V, Z](source: Signal[V], zero: Z, f: (Z, V) => Z) extends ProxySignal[Z](source) {
class RecoverSignal[V](source: Signal[V], recover: Throwable => Option[V])
extends ProxySignal[V](source) {

override protected def computeValue(current: Option[V]): Option[V] = source.value

override def changed(ec: Option[ExecutionContext]): Unit =
try update(computeValue, ec) catch {
case t: Throwable => updateWith(recover(t), ec)
}
}

class RecoverWithSignal[V](source: Signal[V], recoverWith: PartialFunction[Throwable, Option[V]])
extends ProxySignal[V](source) {

override protected def computeValue(current: Option[V]): Option[V] = source.value

override def changed(ec: Option[ExecutionContext]): Unit =
try update(computeValue, ec) catch {
case t: Throwable if recoverWith.isDefinedAt(t) => updateWith(recoverWith(t), ec)
}
}

class ScanSignal[V, Z](source: Signal[V], zero: Z, f: (Z, V) => Z) extends ProxySignal[Z](source) {
value = Some(zero)

override protected def computeValue(current: Option[Z]): Option[Z] =
Expand All @@ -32,7 +54,7 @@ private[signals3] object ProxySignal {
override protected def computeValue(current: Option[Z]): Option[Z] = source.value.map(f)
}

final class GroupedSignal[V](source: Signal[V], n: Int) extends ProxySignal[Seq[V]](source) {
class GroupedSignal[V](source: Signal[V], n: Int) extends ProxySignal[Seq[V]](source) {
require(n > 0, "n must be positive")
private val buffer = scala.collection.mutable.ArrayBuffer.empty[V]

Expand All @@ -47,7 +69,7 @@ private[signals3] object ProxySignal {
}
}

final class GroupBySignal[V](source: Signal[V], groupBy: V => Boolean) extends ProxySignal[Seq[V]](source) {
class GroupBySignal[V](source: Signal[V], groupBy: V => Boolean) extends ProxySignal[Seq[V]](source) {
private val buffer = scala.collection.mutable.ArrayBuffer.empty[V]

override protected def computeValue(current: Option[Seq[V]]): Option[Seq[V]] = {
Expand All @@ -64,7 +86,6 @@ private[signals3] object ProxySignal {
}
}


class IndexedSignal[V](source: Signal[V]) extends ProxySignal[V](source) with Indexed {
value = source.value

Expand All @@ -74,7 +95,9 @@ private[signals3] object ProxySignal {
}
}

final class DropSignal[V](source: Signal[V], drop: Int) extends IndexedSignal[V](source) {
class DropSignal[V](source: Signal[V], drop: Int) extends ProxySignal[V](source) with Indexed {
require(drop > 0, "drop must be positive")
value = None
override protected def computeValue(current: Option[V]): Option[V] = {
val c = if (source.value != current) incAndGet() else counter
if (c > drop) source.value else current
Expand All @@ -101,7 +124,7 @@ private[signals3] object ProxySignal {
else source.value
}

final class DropWhileSignal[V](source: Signal[V], p: V => Boolean) extends ProxySignal[V](source) {
class DropWhileSignal[V](source: Signal[V], p: V => Boolean) extends ProxySignal[V](source) {
@volatile private var dropping = true
override protected def computeValue(current: Option[V]): Option[V] = {
if (dropping && source.value != current) dropping = source.value.exists(p)
Expand Down
98 changes: 47 additions & 51 deletions src/main/scala/io/github/makingthematrix/signals3/ProxyStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@ import io.github.makingthematrix.signals3.Stream.EventSubscriber

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Failure, Success}


/** A superclass for all event streams which compose other event streams into one.
*
* @param sources A variable arguments list of event streams serving as sources of events for the resulting stream.
* @tparam A The type of the events emitted by all the source streams.
* @tparam E The type of the events emitted by the stream constructed from the sources.
*/
abstract private[signals3] class ProxyStream[A, E](sources: Stream[A]*)
extends Stream[E] with EventSubscriber[A] {
abstract private[signals3] class ProxyStream[A, E](sources: Stream[A]*) extends Stream[E] with EventSubscriber[A] {
/** When the first subscriber is registered in this stream, subscribe the stream to all its sources. */
override protected[signals3] def onWire(): Unit = sources.foreach(_.subscribe(this))

Expand All @@ -24,43 +21,64 @@ abstract private[signals3] class ProxyStream[A, E](sources: Stream[A]*)
}

private[signals3] object ProxyStream {
class RecoverStream[E](source: Stream[E], recover: Throwable => Option[E])
extends ProxyStream[E, E](source) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
try dispatch(event, sourceContext) catch {
case t: Throwable => recover(t).foreach(dispatch(_, sourceContext))
}
}

class RecoverWithStream[E](source: Stream[E], recoverWith: PartialFunction[Throwable, Option[E]])
extends ProxyStream[E, E](source) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
try dispatch(event, sourceContext) catch {
case t: Throwable if recoverWith.isDefinedAt(t) =>
recoverWith(t).foreach(dispatch(_, sourceContext))
}
}

class MapStream[E, V](source: Stream[E], f: E => V) extends ProxyStream[E, V](source) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
dispatch(f(event), sourceContext)
}

class FutureStream[E, V](source: Stream[E], f: E => Future[V])
extends ProxyStream[E, V](source) {
/**
* TODO: FutureStream handles exceptions differently than other streams. I need to decide what to do with it re FallbackStrategy.
* @param source The stream from which events are taken.
* @param f A function which takes an event from the source stream and returns a [[Future]] with its result.
* @tparam E The type of the events emitted by the stream constructed from the sources.
* @tparam V The type of the result of the future returned by the function.
*/
class FutureStream[E, V](source: Stream[E], f: E => Future[V]) extends ProxyStream[E, V](source) {
private val key = java.util.UUID.randomUUID()

override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
Serialized.future(key.toString)(f(event)).andThen {
case Success(v) => dispatch(v, sourceContext)
case Success(v) => dispatch(v, sourceContext)
case Failure(_: NoSuchElementException) => // do nothing to allow Future.filter/collect
case Failure(_) =>
case Failure(_) =>
}(using sourceContext.getOrElse(Threading.defaultContext))

}

class CollectStream[E, V](source: Stream[E], pf: PartialFunction[E, V])
extends ProxyStream[E, V](source) {
class CollectStream[E, V](source: Stream[E], pf: PartialFunction[E, V]) extends ProxyStream[E, V](source) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
if (pf.isDefinedAt(event)) dispatch(pf(event), sourceContext)
}

class FilterStream[E](source: Stream[E], predicate: E => Boolean)
extends ProxyStream[E, E](source) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
class FilterStream[E](source: Stream[E], predicate: E => Boolean) extends ProxyStream[E, E](source) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit = {
if (predicate(event)) dispatch(event, sourceContext)
}
}

class JoinStream[E](sources: Stream[E]*)
extends ProxyStream[E, E](sources*) {
class JoinStream[E](sources: Stream[E]*) extends ProxyStream[E, E](sources*) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
dispatch(event, sourceContext)
}

final class ScanStream[E, V](source: Stream[E], zero: V, f: (V, E) => V)
extends ProxyStream[E, V](source) {
class ScanStream[E, V](source: Stream[E], zero: V, f: (V, E) => V) extends ProxyStream[E, V](source) {
@volatile private var value = zero

override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit = {
Expand All @@ -69,7 +87,7 @@ private[signals3] object ProxyStream {
}
}

final class GroupedStream[E](source: Stream[E], n: Int) extends ProxyStream[E, Seq[E]](source) {
class GroupedStream[E](source: Stream[E], n: Int) extends ProxyStream[E, Seq[E]](source) {
require(n > 0, "n must be positive")
private val buffer = ArrayBuffer.empty[E]

Expand All @@ -79,19 +97,19 @@ private[signals3] object ProxyStream {
val res = buffer.toSeq
buffer.clear()
dispatch(res, sourceContext)
} // end if
}
}
}

final class GroupByStream[E](source: Stream[E], groupBy: E => Boolean) extends ProxyStream[E, Seq[E]](source) {
class GroupByStream[E](source: Stream[E], groupBy: E => Boolean) extends ProxyStream[E, Seq[E]](source) {
private val buffer = ArrayBuffer.empty[E]

override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit = {
if (groupBy(event)) {
val res = buffer.toSeq
buffer.clear()
if (res.nonEmpty) dispatch(res, sourceContext)
} // end if
}
buffer.addOne(event)
}
}
Expand All @@ -103,20 +121,21 @@ private[signals3] object ProxyStream {
}
}

final class DropStream[E](source: Stream[E], drop: Int) extends IndexedStream[E](source) {
class DropStream[E](source: Stream[E], drop: Int) extends IndexedStream[E](source) {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
if (counter < drop) inc() else dispatch(event, sourceContext)
}

final class DropWhileStream[E](source: Stream[E], p: E => Boolean) extends ProxyStream[E, E](source) {
class DropWhileStream[E](source: Stream[E], p: E => Boolean) extends ProxyStream[E, E](source) {
@volatile private var dropping = true
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit = {
if (dropping) dropping = p(event)
if (!dropping) dispatch(event, sourceContext)
}
}

final class CloseableStream[E](source: Stream[E]) extends ProxyStream[E, E](source) with Closeable {
final class CloseableStream[E](source: Stream[E])
extends ProxyStream[E, E](source) with Closeable {
override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit =
if (!isClosed) dispatch(event, sourceContext)
}
Expand All @@ -127,39 +146,16 @@ private[signals3] object ProxyStream {
private var previousEvent: Option[E] = None

override protected[signals3] def onEvent(event: E, sourceContext: Option[ExecutionContext]): Unit = if (!isClosed) {
if (!p(event)) {
if (p(event)) {
dispatch(event, sourceContext)
previousEvent = Some(event)
} else {
close()
(lastPromise, previousEvent) match {
case (Some(p), Some(pe)) if !p.isCompleted => p.trySuccess(pe)
case _ =>
}
} else {
dispatch(event, sourceContext)
previousEvent = Some(event)
}
}
}

final class FlatMapStream[E, V](source: Stream[E], f: E => Stream[V])
extends Stream[V] with EventSubscriber[E] {
@volatile private var mapped: Option[Stream[V]] = None

private val subscriber = new EventSubscriber[V] {
override protected[signals3] def onEvent(event: V, currentContext: Option[ExecutionContext]): Unit =
dispatch(event, currentContext)
}

override protected[signals3] def onEvent(event: E, currentContext: Option[ExecutionContext]): Unit = {
mapped.foreach(_.unsubscribe(subscriber))
mapped = Some(f(event).tap(_.subscribe(subscriber)))
}

override protected def onWire(): Unit = source.subscribe(this)

override protected def onUnwire(): Unit = {
mapped.foreach(_.unsubscribe(subscriber))
mapped = None
source.unsubscribe(this)
}
}
}
Loading
Loading