Skip to content

Acked streams scala 2.13 #195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jdk:
services:
- rabbitmq

script: sbt '+ test'

sudo: false

cache:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,141 +1,68 @@
package com.spingo.op_rabbit
package stream

import akka.actor.{ActorRef,Props}
import akka.actor.FSM
import akka.pattern.ask
import akka.stream.scaladsl.Sink
import akka.stream.actor._
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._
import com.timcharper.acked.AckedSink
import scala.util.{Try,Success,Failure}

private [stream] object MessagePublisherSinkActor {
sealed trait State
case object Running extends State
case object Stopping extends State
case object AllDoneFuturePlease
}

private class MessagePublisherSinkActor(rabbitControl: ActorRef, timeoutAfter: FiniteDuration, qos: Int) extends ActorSubscriber with FSM[MessagePublisherSinkActor.State, Unit] {
import ActorSubscriberMessage._
import MessagePublisherSinkActor._

private val queue = scala.collection.mutable.Map.empty[Long, Promise[Unit]]
private val completed = Promise[Unit]

startWith(Running, ())

override val requestStrategy = new MaxInFlightRequestStrategy(max = qos) {
override def inFlightInternally: Int = queue.size
}

override def postRestart(reason: Throwable): Unit = {
stopWith(Failure(reason))
super.postRestart(reason)
}

private def stopWith(reason: Try[Unit]): Unit = {
context stop self
completed.tryComplete(reason)
}

when(Running) {
case Event(response: Message.ConfirmResponse, _) =>
handleResponse(response)
stay

case Event(OnError(e), _) =>
completed.tryFailure(e)
goto(Stopping)

case Event(OnComplete, _) =>
goto(Stopping)
}

when(Stopping) {
case Event(response: Message.ConfirmResponse, _) =>
handleResponse(response)
if(queue.isEmpty)
stop
else
stay
}

whenUnhandled {
case Event(OnNext((p: Promise[Unit] @unchecked, msg: Message)), _) =>
queue(msg.id) = p
rabbitControl ! msg
stay

case Event(MessagePublisherSinkActor.AllDoneFuturePlease,_) =>
sender ! completed.future
stay
}

onTransition {
case Running -> Stopping if queue.isEmpty =>
stopWith(Success(()))
}

onTermination {
case e: StopEvent =>
stopWith(Success(()))
}
import akka.actor.{ActorRef,Props}
import akka.actor.typed.scaladsl.Behaviors
import akka.pattern.ask
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.Source
import akka.stream.stage.{AbstractInHandler, GraphStage, GraphStageLogic}

private val handleResponse: Message.ConfirmResponse => Unit = {
case Message.Ack(id) =>
queue.remove(id).get.success(())
import com.timcharper.acked.{AckTup, AckedSink}
import akka.stream.scaladsl.Flow
import akka.actor.typed.Behavior
import akka.actor.ActorSystem
import akka.actor.Actor

case Message.Nack(id) =>
queue.remove(id).get.failure(new MessageNacked(id))
object MessagePublisherSink {
private type In = AckTup[Message]

case Message.Fail(id, exception: Throwable) =>
queue.remove(id).get.failure(exception)
def acked(name: String, rabbitControl: ActorRef, actorSystem: ActorSystem) = AckedSink {
MessagePublisherSink(rabbitControl, actorSystem).named(name)
}
}

/**
A MessagePublisherSink (an [[https://github.com/timcharper/acked-stream/blob/master/src/main/scala/com/timcharper/acked/AckedSink.scala AckedSink]]) publishes each input [[Message]], and either acks or fails the upstream element, depending on [[Message$.ConfirmResponse ConfirmResponse]].

Using a [[RabbitSource$ RabbitSource]] with a [[MessagePublisherSink$ MessagePublisherSink]] is a great way to get persistent, recoverable streams.
case class MessagePublisherSink(rabbitControl: ActorRef, actorSystem: ActorSystem) extends GraphStage[SinkShape[AckTup[Message]]] {
import MessagePublisherSink.In

Note - MessagePublisherSink uses ActorPublisher and due to AkkaStream limitations, it DOES NOT abide your configured supervisor strategy.
val in: Inlet[In] = Inlet.create("MessagePublisherSinkActor.in")

== [[com.spingo.op_rabbit.Message$.ConfirmResponse Message.ConfirmResponse]] handling ==
override val shape: SinkShape[In] = SinkShape.of(in)

After the sink publishes the [[Message]], it listens for the [[Message$.ConfirmResponse Message.ConfirmResponse]], and handles it accordingly:
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) {
val queue = scala.collection.mutable.Map.empty[Long, Promise[Unit]]
val completed = Promise[Unit]

- On [[Message$.Ack Message.Ack]], ack the upstream element.

- On [[Message$.Nack Message.Nack]], fail the upstream element with
[[MessageNacked]]. '''Does not''' throw a stream
exception. Processing continues.

- On [[Message$.Fail Message.Fail]], fail the upstream element with
publisher exception. '''Does not''' throw a stream
exception. Processing continues.

== Future[Unit] materialized type: ==

This sinks materialized type is Future[Unit]. The following applies:

- It yields any upstream failure as soon as it reaches the sink (potentially before messages are confirmed).
- After the stream completes, and all [[Message$.ConfirmResponse Message.ConfirmResponse]]'s have have been processed, the Future[Unit] is completed.
*/
object MessagePublisherSink {
/**
@param rabbitControl An actor
@param timeoutAfter The duration for which we'll wait for a message to be acked; note, timeouts and non-acknowledged messages will cause the upstream elements to fail. The sink will not throw an exception.
*/
def apply(rabbitControl: ActorRef, timeoutAfter: FiniteDuration = 30 seconds, qos: Int = 8): AckedSink[Message, Future[Unit]] = AckedSink {
Sink.actorSubscriber[(Promise[Unit], Message)](Props(new MessagePublisherSinkActor(rabbitControl, timeoutAfter, qos))).
mapMaterializedValue { subscriber =>
implicit val akkaTimeout = akka.util.Timeout(timeoutAfter)
implicit val ec = SameThreadExecutionContext

(subscriber ? MessagePublisherSinkActor.AllDoneFuturePlease).mapTo[Future[Unit]].flatMap(identity)
class ConfirmationActor extends Actor {
override def receive: Receive = {
case Message.Ack(id) =>
queue.remove(id).get.success(())
case Message.Nack(id) =>
queue.remove(id).get.failure(new MessageNacked(id))
case Message.Fail(id, exception: Throwable) =>
queue.remove(id).get.failure(exception)
}
}
}

val confirmationActor = actorSystem.actorOf(Props[ConfirmationActor])

setHandler(in, new AbstractInHandler() {
override def onPush() {
val (promise, msg) = grab(in)
queue(msg.id) = promise
rabbitControl.tell(msg, sender = confirmationActor)
}

override def onUpstreamFinish() {
completed.success(())
super.onUpstreamFinish()
}
})
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.spingo.op_rabbit
package stream

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Try,Failure}

import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink}
Expand All @@ -11,8 +14,6 @@ import com.spingo.op_rabbit.helpers.RabbitTestHelpers
import com.timcharper.acked.AckedSource
import com.spingo.scoped_fixtures.ScopedFixtures
import org.scalatest.{FunSpec, Matchers}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Try,Failure}

class MessagePublisherSinkSpec extends FunSpec with ScopedFixtures with Matchers with RabbitTestHelpers {
implicit val executionContext = ExecutionContext.global
Expand Down Expand Up @@ -62,9 +63,9 @@ class MessagePublisherSinkSpec extends FunSpec with ScopedFixtures with Matchers

val published = AckedSource(data).
map(Message.queue(_, queueName)).
runWith(MessagePublisherSink(rabbitControl))
to(MessagePublisherSink.acked("test-sink", rabbitControl, actorSystem))

await(published)
published.run()
await(Future.sequence(data.map(_._1.future))) // this asserts that all of the promises were fulfilled
await(consumed) should be (range)
}
Expand All @@ -73,20 +74,18 @@ class MessagePublisherSinkSpec extends FunSpec with ScopedFixtures with Matchers
it("propagates publish exceptions to promise") {
new RabbitFixtures {
val factory = Message.factory(Publisher.queue(Queue.passive("no-existe")))
val sink = MessagePublisherSink(rabbitControl)
val sink = MessagePublisherSink.acked("test-sink", rabbitControl, actorSystem)

val data = range map { i => (Promise[Unit], i) }

val published = AckedSource(data).
map(Message(_, Publisher.queue(Queue.passive("no-existe")))).
runWith(sink)
to(sink)

await(published)
published.run()
val Failure(ex) = Try(await(data.head._1.future))
ex.getMessage should include ("no queue 'no-existe'")
}
}
}


}
32 changes: 19 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import java.util.Properties

val json4sVersion = "3.6.6"
val circeVersion = "0.12.3"
val akkaVersion = "2.5.25"
val json4sVersion = "3.6.11"
val circeVersion = "0.14.1"
val akkaVersion = "2.6.17"
val playVersion = "2.7.4"

val appProperties = {
Expand All @@ -16,17 +16,18 @@ val assertNoApplicationConf = taskKey[Unit]("Makes sure application.conf isn't p
val commonSettings = Seq(
organization := "com.spingo",
version := appProperties.getProperty("version"),
scalaVersion := "2.12.10",
crossScalaVersions := Seq("2.12.10", "2.13.1"),
scalaVersion := "2.13.6",
crossScalaVersions := Seq("2.12.15", "2.13.6"),
libraryDependencies ++= Seq(
"com.chuusai" %% "shapeless" % "2.3.3",
"com.typesafe" % "config" % "1.3.4",
"com.newmotion" %% "akka-rabbitmq" % "5.1.2",
"org.slf4j" % "slf4j-api" % "1.7.26",
"ch.qos.logback" % "logback-classic" % "1.2.3" % "test",
"com.chuusai" %% "shapeless" % "2.3.7",
"com.typesafe" % "config" % "1.4.1",
"com.newmotion" %% "akka-rabbitmq" % "6.0.0",
"org.slf4j" % "slf4j-api" % "1.7.32",
"ch.qos.logback" % "logback-classic" % "1.2.5" % "test",
"org.scalatest" %% "scalatest" % "3.0.8" % "test",
"com.spingo" %% "scoped-fixtures" % "2.0.0" % "test",
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion % "test"
),
Expand Down Expand Up @@ -120,7 +121,7 @@ lazy val upickle = (project in file("./addons/upickle")).
libraryDependencies += "com.lihaoyi" %% "upickle" % (
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, 11)) => "0.7.4"
case _ => "0.8.0"
case _ => "1.2.2"
}
)).
dependsOn(core)
Expand All @@ -136,9 +137,14 @@ lazy val `akka-stream` = (project in file("./addons/akka-stream")).
settings(commonSettings: _*).
settings(
name := "op-rabbit-akka-stream",
// Temporarily depend on jitpack published version of acked-streams for scala 2.13
resolvers += "jitpack" at "https://jitpack.io",
libraryDependencies ++= Seq(
"com.timcharper" %% "acked-streams" % "2.1.1",
"com.typesafe.akka" %% "akka-stream" % akkaVersion),
// TODO: remove and switch to com.timcharper when https://github.com/timcharper/acked-stream/pull/10 gets merged and published
// "com.timcharper" %% "acked-streams" % "2.1.1",
"com.github.deal-engine.acked-stream" %% "acked-streams" % "5babfe7f85",
"com.typesafe.akka" %% "akka-stream" % akkaVersion
),
unmanagedResourceDirectories in Test ++= Seq(
file(".").getAbsoluteFile / "core" / "src" / "test" / "resources"),
unmanagedSourceDirectories in Test ++= Seq(
Expand Down
2 changes: 2 additions & 0 deletions jitpack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
install:
- sbt '+ publishM2'