Skip to content

Commit 07cb68f

Browse files
authored
Merge pull request #234 from evolution-gaming/sequentially-ce-metrics
Sequentially ce metrics
2 parents 6797dd5 + 8755154 commit 07cb68f

File tree

8 files changed

+866
-11
lines changed

8 files changed

+866
-11
lines changed

benchmark/src/main/scala/com/evolutiongaming/concurrent/sequentially/SequentiallyCatsVsAkkaBenchmark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class SequentiallyCatsVsAkkaBenchmark extends Common {
4040
dispatcher = disp
4141
dispatcherCleanup = dispClean
4242

43-
val (seq, clean) = SequentiallyF.resource[IO, Int].allocated.unsafeRunSync()(runtime)
43+
val (seq, clean) = SequentiallyF.resource[IO, Int]().allocated.unsafeRunSync()(runtime)
4444
catsSequentially = seq
4545
catsCleanup = clean
4646
}

build.sbt

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,26 @@ lazy val `sequentially-ce` = projectMatrix
9797
)
9898
.jvmPlatform(scalaVersions = scalaVersions)
9999

100+
lazy val `sequentially-ce-metrics` = projectMatrix
101+
.in(file("sequentially-ce-metrics"))
102+
.settings(commonSettings)
103+
.settings(
104+
name := "sequentially-ce-metrics"
105+
)
106+
.settings(
107+
libraryDependencies ++= Seq(
108+
PrometheusTools,
109+
Dependencies.CatsEffect.effect,
110+
Scalatest % Test,
111+
),
112+
excludeDependencies ++= Seq(
113+
ExclusionRule("com.typesafe.akka"),
114+
ExclusionRule("org.apache.pekko"),
115+
),
116+
)
117+
.jvmPlatform(scalaVersions = scalaVersions)
118+
.dependsOn(`sequentially-ce`)
119+
100120
lazy val benchmark = project
101121
.in(file("benchmark"))
102122
.settings(commonSettings)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.evolutiongaming.concurrent
2+
3+
import cats.effect.kernel.Async
4+
import cats.effect.std.Dispatcher
5+
import cats.syntax.all.*
6+
import com.evolutiongaming.concurrent.sequentially.SequentiallyF
7+
8+
import scala.concurrent.Future
9+
10+
/** Wrapper around SequentiallyF that adds metrics tracking.
11+
* Since SequentiallyF is a final class, we wrap it by delegation.
12+
*/
13+
final class MeteredSequentiallyF[F[_]: Async, -K] private (
14+
private val sequentially: SequentiallyF[F, K],
15+
private val metrics: SequentiallyMetricsF[F],
16+
) {
17+
18+
def apply[T](
19+
key: K
20+
)(
21+
task: => T
22+
)(implicit
23+
dispatcher: Dispatcher[F]
24+
): Future[T] = {
25+
dispatcher.unsafeToFuture(applyF(key)(Async[F].delay(task)))
26+
}
27+
28+
def applyF[T](key: K)(task: => F[T]): F[T] = {
29+
val start = System.nanoTime()
30+
31+
metrics.queue(start) *> metrics.run(sequentially.applyF(key)(task))
32+
}
33+
}
34+
35+
object MeteredSequentiallyF {
36+
37+
def apply[F[_]: Async, K](
38+
sequentially: SequentiallyF[F, K],
39+
name: String,
40+
sequentiallyMetrics: SequentiallyMetricsF.Factory[F],
41+
): MeteredSequentiallyF[F, K] = {
42+
apply(sequentially, sequentiallyMetrics(name))
43+
}
44+
45+
def apply[F[_]: Async, K](
46+
sequentially: SequentiallyF[F, K],
47+
metrics: SequentiallyMetricsF[F],
48+
): MeteredSequentiallyF[F, K] = {
49+
new MeteredSequentiallyF(sequentially, metrics)
50+
}
51+
52+
trait Factory[F[_]] {
53+
def apply[K](name: String): MeteredSequentiallyF[F, K]
54+
}
55+
56+
object Factory {
57+
58+
trait Provider[F[_]] {
59+
def apply[K]: SequentiallyF[F, K]
60+
}
61+
62+
def apply[F[_]: Async](
63+
provider: Provider[F],
64+
sequentiallyMetrics: SequentiallyMetricsF.Factory[F],
65+
): Factory[F] = new Factory[F] {
66+
override def apply[K](name: String): MeteredSequentiallyF[F, K] =
67+
MeteredSequentiallyF(provider.apply[K], sequentiallyMetrics(name))
68+
}
69+
}
70+
71+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.evolutiongaming.concurrent
2+
3+
import cats.effect.kernel.Sync
4+
import cats.syntax.all.*
5+
import com.evolutiongaming.prometheus.PrometheusHelper.*
6+
import io.prometheus.client.{CollectorRegistry, Summary}
7+
8+
trait SequentiallyMetricsF[F[_]] {
9+
def queue(startNanos: Long): F[Unit]
10+
def run[T](task: => F[T]): F[T]
11+
}
12+
13+
object SequentiallyMetricsF {
14+
15+
type Factory[F[_]] = String => SequentiallyMetricsF[F]
16+
17+
object Factory {
18+
19+
/** @note Must be singleton as metric names must be unique.
20+
* @see CollectorRegistry#register
21+
*/
22+
def apply[F[_]: Sync](
23+
prometheusRegistry: CollectorRegistry,
24+
prefix: String = "sequentially",
25+
): Factory[F] = {
26+
val time = Summary
27+
.build()
28+
.name(s"${ prefix }_time")
29+
.help("Latency of Sequentially operations (queue, run) (by name)")
30+
.labelNames("name", "operation")
31+
.defaultQuantiles()
32+
.register(prometheusRegistry)
33+
34+
name =>
35+
new SequentiallyMetricsF[F] {
36+
def queue(startNanos: Long): F[Unit] = {
37+
Sync[F].delay {
38+
time.labels(name, "queue").timeTillNowNanos(startNanos)
39+
}
40+
}
41+
42+
def run[T](task: => F[T]): F[T] = {
43+
Sync[F].defer {
44+
val start = System.nanoTime()
45+
task.flatMap { result =>
46+
Sync[F].delay {
47+
time.labels(name, "run").observe((System.nanoTime() - start).toDouble / 1e9)
48+
result
49+
}
50+
}
51+
}
52+
}
53+
}
54+
}
55+
}
56+
}
57+

0 commit comments

Comments
 (0)