Skip to content

Commit 18e0eb9

Browse files
authored
Merge pull request #2588 from mpilquist/topic/timeseries
Import scodec-stream and scodec-protocols in to fs2
2 parents 28617f6 + 64ea6ad commit 18e0eb9

File tree

74 files changed

+7437
-19
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+7437
-19
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ jobs:
6868
run: sbt ++${{ matrix.scala }} microsite/mdoc
6969

7070
- name: Compress target directories
71-
run: tar cf targets.tar target node/js/target core/js/target core/jvm/target io/js/target reactive-streams/target io/jvm/target benchmark/target project/target
71+
run: tar cf targets.tar target node/js/target protocols/js/target core/js/target core/jvm/target scodec/jvm/target scodec/js/target io/js/target reactive-streams/target io/jvm/target protocols/jvm/target benchmark/target project/target
7272

7373
- name: Upload target directories
7474
uses: actions/upload-artifact@v2

.scalafmt.conf

+2-1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ rewrite.neverInfix.excludeFilters = [until
2828
have
2929
when
3030
size
31-
theSameElementsAs]
31+
theSameElementsAs
32+
at]

build.sbt

+76-5
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import com.typesafe.tools.mima.core._
22
import sbtcrossproject.crossProject
33

4-
addCommandAlias("fmt", "; compile:scalafmt; test:scalafmt; it:scalafmt; scalafmtSbt")
4+
addCommandAlias("fmt", "; Compile/scalafmt; Test/scalafmt; IntegrationTest/scalafmt; scalafmtSbt")
55
addCommandAlias(
66
"fmtCheck",
7-
"; compile:scalafmtCheck; test:scalafmtCheck; it:scalafmtCheck; scalafmtSbtCheck"
7+
"; Compile/scalafmtCheck; Test/scalafmtCheck; IntegrationTest/scalafmtCheck; scalafmtSbtCheck"
88
)
99
addCommandAlias("testJVM", ";rootJVM/test")
1010
addCommandAlias("testJS", "rootJS/test")
@@ -159,7 +159,19 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
159159
lazy val root = project
160160
.in(file("."))
161161
.enablePlugins(NoPublishPlugin, SonatypeCiReleasePlugin)
162-
.aggregate(coreJVM, coreJS, io.jvm, node.js, io.js, reactiveStreams, benchmark)
162+
.aggregate(
163+
coreJVM,
164+
coreJS,
165+
io.jvm,
166+
node.js,
167+
io.js,
168+
scodec.jvm,
169+
scodec.js,
170+
protocols.jvm,
171+
protocols.js,
172+
reactiveStreams,
173+
benchmark
174+
)
163175

164176
lazy val rootJVM = project
165177
.in(file("."))
@@ -257,7 +269,7 @@ lazy val io = crossProject(JVMPlatform, JSPlatform)
257269
.jsConfigure(_.enablePlugins(ScalaJSBundlerPlugin))
258270
.settings(
259271
name := "fs2-io",
260-
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.0-27-0b113c0",
272+
libraryDependencies += "com.comcast" %%% "ip4s-core" % "3.1.0",
261273
OsgiKeys.exportPackage := Seq("fs2.io.*"),
262274
OsgiKeys.privatePackage := Seq(),
263275
OsgiKeys.importPackage := {
@@ -287,6 +299,65 @@ lazy val io = crossProject(JVMPlatform, JSPlatform)
287299
.dependsOn(core % "compile->compile;test->test")
288300
.jsConfigure(_.dependsOn(node.js))
289301

302+
lazy val scodec = crossProject(JVMPlatform, JSPlatform)
303+
.in(file("scodec"))
304+
.enablePlugins(SbtOsgi)
305+
.jsConfigure(_.enablePlugins(ScalaJSBundlerPlugin))
306+
.settings(
307+
name := "fs2-scodec",
308+
libraryDependencies += "org.scodec" %%% "scodec-core" % (if (
309+
scalaVersion.value.startsWith("2.")
310+
)
311+
"1.11.9"
312+
else "2.1.0"),
313+
OsgiKeys.exportPackage := Seq("fs2.interop.scodec.*"),
314+
OsgiKeys.privatePackage := Seq(),
315+
OsgiKeys.importPackage := {
316+
val Some((major, minor)) = CrossVersion.partialVersion(scalaVersion.value)
317+
Seq(
318+
s"""scala.*;version="[$major.$minor,$major.${minor + 1})"""",
319+
"""fs2.*;version="${Bundle-Version}"""",
320+
"*"
321+
)
322+
},
323+
OsgiKeys.additionalHeaders := Map("-removeheaders" -> "Include-Resource,Private-Package"),
324+
osgiSettings,
325+
mimaPreviousArtifacts := mimaPreviousArtifacts.value.filter { v =>
326+
VersionNumber(v.revision).matchesSemVer(SemanticSelector(">3.2.0"))
327+
}
328+
)
329+
.jsSettings(
330+
scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.CommonJSModule))
331+
)
332+
.dependsOn(core % "compile->compile;test->test", io % "test")
333+
334+
lazy val protocols = crossProject(JVMPlatform, JSPlatform)
335+
.in(file("protocols"))
336+
.enablePlugins(SbtOsgi)
337+
.jsConfigure(_.enablePlugins(ScalaJSBundlerPlugin))
338+
.settings(
339+
name := "fs2-protocols",
340+
OsgiKeys.exportPackage := Seq("fs2.protocols.*"),
341+
OsgiKeys.privatePackage := Seq(),
342+
OsgiKeys.importPackage := {
343+
val Some((major, minor)) = CrossVersion.partialVersion(scalaVersion.value)
344+
Seq(
345+
s"""scala.*;version="[$major.$minor,$major.${minor + 1})"""",
346+
"""fs2.*;version="${Bundle-Version}"""",
347+
"*"
348+
)
349+
},
350+
OsgiKeys.additionalHeaders := Map("-removeheaders" -> "Include-Resource,Private-Package"),
351+
osgiSettings,
352+
mimaPreviousArtifacts := mimaPreviousArtifacts.value.filter { v =>
353+
VersionNumber(v.revision).matchesSemVer(SemanticSelector(">3.2.0"))
354+
}
355+
)
356+
.jsSettings(
357+
scalaJSLinkerConfig ~= (_.withModuleKind(ModuleKind.CommonJSModule))
358+
)
359+
.dependsOn(core % "compile->compile;test->test", scodec, io)
360+
290361
lazy val reactiveStreams = project
291362
.in(file("reactive-streams"))
292363
.enablePlugins(SbtOsgi)
@@ -337,7 +408,7 @@ lazy val microsite = project
337408
githubWorkflowArtifactUpload := false,
338409
fatalWarningsInCI := false
339410
)
340-
.dependsOn(coreJVM, io.jvm, reactiveStreams)
411+
.dependsOn(coreJVM, io.jvm, reactiveStreams, scodec.jvm)
341412
.enablePlugins(MdocPlugin, NoPublishPlugin)
342413

343414
ThisBuild / githubWorkflowBuildPostamble ++= List(
+259
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Copyright (c) 2013 Functional Streams for Scala
3+
*
4+
* Permission is hereby granted, free of charge, to any person obtaining a copy of
5+
* this software and associated documentation files (the "Software"), to deal in
6+
* the Software without restriction, including without limitation the rights to
7+
* use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
8+
* the Software, and to permit persons to whom the Software is furnished to do so,
9+
* subject to the following conditions:
10+
*
11+
* The above copyright notice and this permission notice shall be included in all
12+
* copies or substantial portions of the Software.
13+
*
14+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
16+
* FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
17+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
18+
* IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
19+
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
// Adapted from scodec-protocols, licensed under 3-clause BSD
23+
package fs2
24+
25+
import cats.{Contravariant, Functor}
26+
import cats.data.AndThen
27+
import cats.arrow.Strong
28+
29+
/** A stateful transformation of the elements of a stream.
30+
*
31+
* A scan is primarily represented as a function `(S, I) => (S, Chunk[O])`.
32+
* Scans also have an initial state value of type `S` and the ability to emit
33+
* elements upon completion via a function `S => Chunk[O]`.
34+
*
35+
* A scan is built up incrementally via various combinators and then converted to
36+
* a pipe via `.toPipe`. For example, `s.through(Scan.lift(identity).toPipe) == s`.
37+
*
38+
* A scan is much less powerful than a pull. Scans cannot evaluate effects or terminate
39+
* early. These limitations allow combinators that are not possible on pulls though.
40+
* For example, the [[first]] method converts a `Scan[S, I, O]` to a `Scan[S, (I, A), (O, A)]`.
41+
* Critically, this method relies on the ability to feed a single `I` to the original scan
42+
* and collect the resulting `O` values, pairing each `O` with the `A` that was paired with `I`.
43+
*/
44+
final class Scan[S, -I, +O](
45+
val initial: S,
46+
private val transform_ : AndThen[(S, I), (S, Chunk[O])],
47+
private val onComplete_ : AndThen[S, Chunk[O]]
48+
) {
49+
50+
/** Transformation function. */
51+
def transform(s: S, i: I): (S, Chunk[O]) = transform_((s, i))
52+
53+
/** Chunk form of [[transform]]. */
54+
def transformAccumulate(s: S, c: Chunk[I]): (S, Chunk[O]) =
55+
// Same as: c.traverse(i => State(transform(_, i))).map(_.flatten).run(s).value
56+
c.foldLeft(s -> Chunk.empty[O]) { case ((s, acc), i) =>
57+
val (s2, os) = transform(s, i)
58+
(s2, acc ++ os)
59+
}
60+
61+
/** Completion function. */
62+
def onComplete(s: S): Chunk[O] = onComplete_(s)
63+
64+
/** Converts this scan to a pipe. */
65+
def toPipe[F[_]]: Stream[F, I] => Stream[F, O] =
66+
_.pull
67+
.scanChunks(initial)(transformAccumulate)
68+
.flatMap(state => Pull.output(onComplete(state)))
69+
.stream
70+
71+
/** Steps this scan by a single input, returning a new scan and the output elements computed from the input. */
72+
def step(i: I): (Scan[S, I, O], Chunk[O]) = {
73+
val (s, os) = transform(initial, i)
74+
(new Scan(s, transform_, onComplete_), os)
75+
}
76+
77+
/** Composes the supplied scan with this scan.
78+
*
79+
* The resulting scan maintains the state of each of the input scans independently.
80+
*/
81+
def andThen[S2, O2](that: Scan[S2, O, O2]): Scan[(S, S2), I, O2] =
82+
Scan[(S, S2), I, O2]((initial, that.initial))(
83+
{ case ((s, s2), i) =>
84+
val (sp, os) = transform(s, i)
85+
val (s2p, out) = that.transformAccumulate(s2, os)
86+
((sp, s2p), out)
87+
},
88+
{ case (s, s2) =>
89+
val (s3, out) = that.transformAccumulate(s2, onComplete(s))
90+
out ++ that.onComplete(s3)
91+
}
92+
)
93+
94+
/** Returns a new scan which transforms output values using the supplied function. */
95+
def map[O2](f: O => O2): Scan[S, I, O2] =
96+
new Scan(
97+
initial,
98+
transform_.andThen[(S, Chunk[O2])] { case (s, os) => (s, os.map(f)) },
99+
onComplete_.andThen(_.map(f))
100+
)
101+
102+
/** Returns a new scan which transforms input values using the supplied function. */
103+
def contramap[I2](f: I2 => I): Scan[S, I2, O] =
104+
new Scan(
105+
initial,
106+
AndThen[(S, I2), (S, I)] { case (s, i2) => (s, f(i2)) }.andThen(transform_),
107+
onComplete_
108+
)
109+
110+
def dimap[I2, O2](g: I2 => I)(f: O => O2): Scan[S, I2, O2] =
111+
Scan[S, I2, O2](initial)(
112+
{ (s, i2) =>
113+
val (s2, os) = transform(s, g(i2))
114+
(s2, os.map(f))
115+
},
116+
onComplete_.andThen(_.map(f))
117+
)
118+
119+
/** Transforms the state type. */
120+
def imapState[S2](g: S => S2)(f: S2 => S): Scan[S2, I, O] =
121+
Scan[S2, I, O](g(initial))(
122+
{ (s2, i) =>
123+
val (s3, os) = transform(f(s2), i)
124+
(g(s3), os)
125+
},
126+
AndThen(f).andThen(onComplete_)
127+
)
128+
129+
/** Returns a new scan with transformed input and output types.
130+
*
131+
* Upon receiving an `I2`, `get` is invoked and the result is fed to the
132+
* original scan. For each output value, `set` is invoked with the original
133+
* `I2` input and the computed `O`, yielding a new output of type `O2`.
134+
*/
135+
def lens[I2, O2](get: I2 => I, set: (I2, O) => O2): Scan[S, I2, O2] =
136+
Scan[S, I2, O2](initial)(
137+
{ (s, i2) =>
138+
val (s2, os) = transform(s, get(i2))
139+
(s2, os.map(s => set(i2, s)))
140+
},
141+
_ => Chunk.empty
142+
)
143+
144+
/** Returns a scan that inputs/outputs pairs of elements, with `I` and `O` in the first element of the pair. */
145+
def first[A]: Scan[S, (I, A), (O, A)] =
146+
lens(_._1, (t, o) => (o, t._2))
147+
148+
/** Returns a scan that inputs/outputs pairs of elements, with `I` and `O` in the second element of the pair. */
149+
def second[A]: Scan[S, (A, I), (A, O)] =
150+
lens(_._2, (t, o) => (t._1, o))
151+
152+
/** Like [[lens]] but some elements are passed to the output (skipping the original scan) while other elements
153+
* are lensed through the original scan.
154+
*/
155+
def semilens[I2, O2](extract: I2 => Either[O2, I], inject: (I2, O) => O2): Scan[S, I2, O2] =
156+
Scan[S, I2, O2](initial)(
157+
(s, i2) =>
158+
extract(i2).fold(
159+
o2 => s -> Chunk.singleton(o2),
160+
i => {
161+
val (s2, os) = transform(s, i)
162+
(s2, os.map(o => inject(i2, o)))
163+
}
164+
),
165+
_ => Chunk.empty
166+
)
167+
168+
/** Like [[semilens]] but the elements of the original scan are output directly. */
169+
def semipass[I2, O2 >: O](extract: I2 => Either[O2, I]): Scan[S, I2, O2] =
170+
semilens(extract, (_, o) => o)
171+
172+
/** Returns a scan that wraps the inputs/outputs with `Either`.
173+
* Elements on the left pass through the original scan while elements on
174+
* the right pass through directly.
175+
*/
176+
def left[A]: Scan[S, Either[I, A], Either[O, A]] =
177+
semilens(_.fold(i => Right(i), a => Left(Right(a))), (_, o) => Left(o))
178+
179+
/** Returns a scan that wraps the inputs/outputs with `Either`.
180+
* Elements on the right pass through the original scan while elements on
181+
* the left pass through directly.
182+
*/
183+
def right[A]: Scan[S, Either[A, I], Either[A, O]] =
184+
semilens(_.fold(a => Left(Left(a)), i => Right(i)), (_, o) => Right(o))
185+
186+
/** Combines this scan with the supplied scan such that elements on the left
187+
* are fed through this scan while elements on the right are fed through the
188+
* suppplied scan. The outputs are joined together.
189+
*/
190+
def choice[S2, I2, O2 >: O](that: Scan[S2, I2, O2]): Scan[(S, S2), Either[I, I2], O2] =
191+
Scan[(S, S2), Either[I, I2], O2]((initial, that.initial))(
192+
{ case ((s, s2), e) =>
193+
e match {
194+
case Left(i) =>
195+
val (sp, os) = transform(s, i)
196+
((sp, s2), os)
197+
case Right(i2) =>
198+
val (s2p, o2s) = that.transform(s2, i2)
199+
((s, s2p), o2s)
200+
}
201+
},
202+
{ case (s, s2) => onComplete(s) ++ that.onComplete(s2) }
203+
)
204+
205+
/** Like [[choice]] but the output elements are kept separate. */
206+
def choose[S2, I2, O2](t: Scan[S2, I2, O2]): Scan[(S, S2), Either[I, I2], Either[O, O2]] =
207+
Scan[(S, S2), Either[I, I2], Either[O, O2]]((initial, t.initial))(
208+
{ case ((s, s2), e) =>
209+
e match {
210+
case Left(i) =>
211+
val (sp, os) = transform(s, i)
212+
((sp, s2), os.map(Left(_)))
213+
case Right(i2) =>
214+
val (s2p, o2s) = t.transform(s2, i2)
215+
((s, s2p), o2s.map(Right(_)))
216+
}
217+
},
218+
{ case (s, s2) => onComplete(s).map(Left(_)) ++ t.onComplete(s2).map(Right(_)) }
219+
)
220+
}
221+
222+
object Scan {
223+
224+
def apply[S, I, O](
225+
initial: S
226+
)(transform: (S, I) => (S, Chunk[O]), onComplete: S => Chunk[O]): Scan[S, I, O] =
227+
new Scan(initial, AndThen { case (s, i) => transform(s, i) }, AndThen(onComplete))
228+
229+
def stateful[S, I, O](initial: S)(transform: (S, I) => (S, Chunk[O])): Scan[S, I, O] =
230+
apply(initial)(transform, _ => Chunk.empty)
231+
232+
def stateful1[S, I, O](initial: S)(f: (S, I) => (S, O)): Scan[S, I, O] =
233+
stateful[S, I, O](initial) { (s, i) =>
234+
val (s2, o) = f(s, i); s2 -> Chunk.singleton(o)
235+
}
236+
237+
def stateless[I, O](f: I => Chunk[O]): Scan[Unit, I, O] =
238+
stateful[Unit, I, O](())((u, i) => (u, f(i)))
239+
240+
def lift[I, O](f: I => O): Scan[Unit, I, O] =
241+
stateless(i => Chunk.singleton(f(i)))
242+
243+
implicit def functor[S, I]: Functor[Scan[S, I, *]] =
244+
new Functor[Scan[S, I, *]] {
245+
def map[O, O2](s: Scan[S, I, O])(f: O => O2) = s.map(f)
246+
}
247+
248+
implicit def contravariant[S, O]: Contravariant[Scan[S, *, O]] =
249+
new Contravariant[Scan[S, *, O]] {
250+
def contramap[I, I2](s: Scan[S, I, O])(f: I2 => I) = s.contramap(f)
251+
}
252+
253+
implicit def strong[S]: Strong[Scan[S, *, *]] = new Strong[Scan[S, *, *]] {
254+
def first[A, B, C](fa: Scan[S, A, B]): Scan[S, (A, C), (B, C)] = fa.first
255+
def second[A, B, C](fa: Scan[S, A, B]): Scan[S, (C, A), (C, B)] = fa.second
256+
def dimap[A, B, C, D](fab: Scan[S, A, B])(f: C => A)(g: B => D): Scan[S, C, D] =
257+
fab.dimap(f)(g)
258+
}
259+
}

0 commit comments

Comments
 (0)