Skip to content
This repository was archived by the owner on Jun 14, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
organization := "com.emarsys"
name := "scheduler"
organization := "com.emarsys"
name := "scheduler"
crossScalaVersions := List("2.13.10", "2.12.17")

scalacOptions ++= scalacOptionsFor(scalaVersion.value)

scalafmtOnCompile := true

libraryDependencies += "org.typelevel" %% "cats-core" % "2.9.0"
libraryDependencies += "org.typelevel" %% "cats-effect" % "2.5.5"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.5.0"
libraryDependencies += "org.scalacheck" %% "scalacheck" % "1.17.0" % Test
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.15" % Test
libraryDependencies += "org.typelevel" %% "cats-laws" % "2.9.0" % Test
libraryDependencies += "org.typelevel" %% "discipline-scalatest" % "2.2.0" % Test
libraryDependencies += "org.typelevel" %% "cats-effect-laws" % "2.5.5" % Test
libraryDependencies += "org.typelevel" %% "cats-effect-laws" % "3.5.0" % Test

addCompilerPlugin("org.typelevel" %% "kind-projector" % "0.13.2" cross CrossVersion.full)

Expand Down
27 changes: 14 additions & 13 deletions src/main/scala/com/emarsys/scheduler/Schedule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package com.emarsys.scheduler

import cats.{Applicative, Apply, Bifunctor, Eq, Functor, Monad, MonadError}
import cats.arrow.Profunctor
import cats.effect.{Async, Timer}
import cats.effect.Async
import cats.syntax.all._

import scala.concurrent.duration._
import cats.effect.Temporal

trait Schedule[F[+_], -A, +B] {
type State
Expand Down Expand Up @@ -40,11 +41,11 @@ object Schedule extends Scheduler with ScheduleInstances with PredefinedSchedule
trait Scheduler {
import Schedule.Decision

def run[F[+_]: Monad: Timer, A, B](F: F[A], schedule: Schedule[F, A, B]): F[B] = {
def run[F[+_]: Monad: Temporal, A, B](F: F[A], schedule: Schedule[F, A, B]): F[B] = {
def loop(decision: Decision[schedule.State, B]): F[B] =
if (decision.continue)
for {
_ <- Timer[F].sleep(decision.delay)
_ <- Temporal[F].sleep(decision.delay)
a <- F
d <- schedule.update(a, decision.state)
b <- loop(d)
Expand All @@ -54,19 +55,19 @@ trait Scheduler {
schedule.initial
.flatMap(initial =>
for {
_ <- Timer[F].sleep(initial.delay)
_ <- Temporal[F].sleep(initial.delay)
a <- F
d <- schedule.update(a, initial.state)
} yield d
)
.flatMap(loop)
}

def retry[E, F[+_]: MonadError[_[_], E]: Timer, A, B](F: F[A], policy: Schedule[F, E, B]): F[A] = {
def retry[E, F[+_]: MonadError[_[_], E]: Temporal, A, B](F: F[A], policy: Schedule[F, E, B]): F[A] = {
def loop(decision: Decision[policy.State, B]): PartialFunction[E, F[A]] = {
case e if decision.continue =>
for {
_ <- Timer[F].sleep(decision.delay)
_ <- Temporal[F].sleep(decision.delay)
next <- policy.update(e, decision.state)
a <- F.recoverWith(loop(next))
} yield a
Expand All @@ -75,7 +76,7 @@ trait Scheduler {
F recoverWith { case e =>
for {
initial <- policy.initial
_ <- Timer[F].sleep(initial.delay)
_ <- Temporal[F].sleep(initial.delay)
d <- policy.update(e, initial.state)
a <- F.recoverWith(loop(d))
} yield a
Expand Down Expand Up @@ -186,7 +187,7 @@ trait PredefinedSchedules {
def spaced[F[+_]: Monad](interval: FiniteDuration): Schedule[F, Any, Int] =
forever.space(interval)

def fixed[F[+_]: Monad: Timer](interval: FiniteDuration): Schedule[F, Any, FiniteDuration] =
def fixed[F[+_]: Monad: Temporal](interval: FiniteDuration): Schedule[F, Any, FiniteDuration] =
Schedule.mapDecision(elapsed.map(elapsed => (interval - elapsed) max Duration.Zero)) { d =>
val (_, n) = d.state
d.copy(
Expand All @@ -195,15 +196,15 @@ trait PredefinedSchedules {
)
}

def elapsed[F[+_]: Functor: Timer]: Schedule.Aux[F, (Long, Long), Any, FiniteDuration] =
def elapsed[F[+_]: Functor: Temporal]: Schedule.Aux[F, (Long, Long), Any, FiniteDuration] =
timing.map { case (s, n) => (n - s).nanos }

def timing[F[+_]: Functor: Timer]: Schedule.Aux[F, (Long, Long), Any, (Long, Long)] =
unfoldM(Timer[F].clock.monotonic(NANOSECONDS).map(n => (n, n))) { case (s, _) =>
Timer[F].clock.monotonic(NANOSECONDS).map(n => (s, n))
def timing[F[+_]: Functor: Temporal]: Schedule.Aux[F, (Long, Long), Any, (Long, Long)] =
unfoldM(Temporal[F].clock.monotonic(NANOSECONDS).map(n => (n, n))) { case (s, _) =>
Temporal[F].clock.monotonic(NANOSECONDS).map(n => (s, n))
}

def maxFor[F[+_]: Monad: Timer](timeCap: FiniteDuration): Schedule[F, Any, FiniteDuration] =
def maxFor[F[+_]: Monad: Temporal](timeCap: FiniteDuration): Schedule[F, Any, FiniteDuration] =
elapsed.reconsider(_.result < timeCap)

def continueOn[F[+_]: Monad](b: Boolean): Schedule[F, Boolean, Int] =
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/com/emarsys/scheduler/Syntax.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package com.emarsys.scheduler

import cats.{Monad, MonadError}
import cats.effect.Timer
import Schedule.Decision

import scala.concurrent.duration.FiniteDuration
import cats.effect.Temporal

trait Syntax {
implicit def toScheduleOps[F[+_]: Monad: Timer, A](fa: F[A]) = new ScheduleOps(fa)
implicit def toRetryOps[E, F[+_]: MonadError[*[_], E]: Timer, A](fa: F[A]) = new RetryOps(fa)
implicit def toCombinators[F[+_]: Monad, A, B](s: Schedule[F, A, B]) = new ScheduleCombinators(s)
implicit def toScheduleOps[F[+_]: Monad: Temporal, A](fa: F[A]) = new ScheduleOps(fa)
implicit def toRetryOps[E, F[+_]: MonadError[*[_], E]: Temporal, A](fa: F[A]) = new RetryOps(fa)
implicit def toCombinators[F[+_]: Monad, A, B](s: Schedule[F, A, B]) = new ScheduleCombinators(s)
}

final class ScheduleOps[F[+_]: Monad: Timer, A](fa: F[A]) {
final class ScheduleOps[F[+_]: Monad: Temporal, A](fa: F[A]) {
def runOn[B](schedule: Schedule[F, A, B]) = Schedule.run(fa, schedule)
}

final class RetryOps[E, F[+_]: MonadError[*[_], E]: Timer, A](fa: F[A]) {
final class RetryOps[E, F[+_]: MonadError[*[_], E]: Temporal, A](fa: F[A]) {
def retry[B](policy: Schedule[F, E, B]) = Schedule.retry(fa, policy)
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/emarsys/scheduler/RetrySpec.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.emarsys.scheduler

import cats.effect.IO
import cats.effect.concurrent.Ref
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._
import scala.util._
import cats.effect.Ref

class RetrySpec extends AnyWordSpec with Matchers {
import Schedule.Decision
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/emarsys/scheduler/SchedulerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.emarsys.scheduler

import cats.effect.IO
import cats.effect.concurrent.Ref
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration._
import scala.util._
import cats.effect.Ref

class SchedulerSpec extends AnyWordSpec with Matchers {
import syntax._
Expand Down