Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ It is currently support the following alternative Akka Persistence and Akka Proj
* R2DBC (Postgres) as the event sourcing event store and the projection using R2DBC (Postgres)
* Cassandra as the event sourcing event store and the projection using JDBC (Postgres)
* JDBC (Postgres) as the event sourcing event store and the projection using JDBC (Postgres)
* R2DBC (Postgres) as the store and Akka gRPC projections into R2DBC (Postgres)

## Running a test locally

Expand Down Expand Up @@ -40,6 +41,16 @@ docker compose -f docker/docker-compose-jdbc-postgres.yml up --wait

Adjust the includes in `local.conf` to select JDBC.

### gRPC / R2DBC (Postgres)

Start a local PostgresSQL server on default port 5432:

```shell
docker compose -f docker/docker-compose-r2dbc-postgres.yml up --wait
```

Adjust the includes in `local.conf` to select 'local-grpc'.

### Run application

Start the application:
Expand Down
13 changes: 7 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
val AkkaVersion = "2.10.6"
val AkkaHttpVersion = "10.7.1"
val AkkaProjectionVersion = "1.6.13"
val AkkaManagementVersion = "1.6.2"
val AkkaVersion = "2.10.9"
val AkkaHttpVersion = "10.7.2"
val AkkaProjectionVersion = "1.6.14"
val AkkaManagementVersion = "1.6.3"
val AkkaPersistenceR2dbcVersion = "1.3.7"
val AkkaPersistenceDynamoDBVersion = "2.0.6"
val AkkaPersistenceJdbcVersion = "5.5.2"
val AkkaPersistenceDynamoDBVersion = "2.0.8"
val AkkaPersistenceJdbcVersion = "5.5.3"
val AkkaPersistenceCassandraVersion = "1.3.2"

ThisBuild / dynverSeparator := "-"
Expand Down Expand Up @@ -40,6 +40,7 @@ lazy val `akka-projection-testing` = project
"com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
"com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion,
"com.lightbend.akka" %% "akka-projection-grpc" % AkkaProjectionVersion,
"com.lightbend.akka" %% "akka-persistence-r2dbc" % AkkaPersistenceR2dbcVersion,
"com.lightbend.akka" %% "akka-projection-r2dbc" % AkkaProjectionVersion,
"com.lightbend.akka" %% "akka-persistence-dynamodb" % AkkaPersistenceDynamoDBVersion,
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ test {

#throttle-actors-per-second = 500
throttle-actors-per-second = off

# Define for gRPC projections to go via a load balancer/ingress something instead of connecting to other cluster nodes
service-dns-name = ""
service-dns-name = ${?SERVICE_DNS_NAME}
}


Expand Down
17 changes: 17 additions & 0 deletions src/main/resources/local-grpc.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
include "r2dbc"
include "local-shared"

akka.http.server.enable-http2 = on

akka.projection.grpc.consumer {
client {
host = "127.0.0.1"
host = ${?SERVICE_GRPC_HOST}
port = ${test.http.port}
port = ${?SERVICE_GRPC_PORT}
use-tls = false
}
stream-id = "events"
}

akka.projection.grpc.producer.query-plugin-id = "akka.persistence.r2dbc.query"
1 change: 1 addition & 0 deletions src/main/resources/local.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
include "local-r2dbc"
#include "local-cassandra"
#include "local-jdbc"
#include "local-grpc"
1 change: 1 addition & 0 deletions src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<!-- <logger name="akka.projection.r2dbc" level="DEBUG" />-->
<!-- <logger name="io.r2dbc.postgresql.QUERY" level="DEBUG" />-->
<!-- <logger name="io.r2dbc.pool" level="DEBUG" />-->
<!-- <logger name="akka.projection.grpc" level="DEBUG" />-->

<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>8192</queueSize>
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/akka/projection/testing/Guardian.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

package akka.projection.testing

import scala.concurrent.Await
import scala.concurrent.duration._

import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
Expand All @@ -19,6 +16,9 @@ import akka.management.scaladsl.AkkaManagement
import akka.projection.ProjectionBehavior
import akka.projection.testing.simulation.Engine

import scala.concurrent.Await
import scala.concurrent.duration._

object Guardian {

def apply(shouldBootstrap: Boolean = false): Behavior[String] = {
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/akka/projection/testing/TestRoutes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
package akka.projection.testing

import java.util.UUID

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -19,6 +17,7 @@ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.projection.testing.grpc.PublishEvents
import akka.projection.testing.simulation.Engine
import akka.projection.testing.simulation.SimulationJsonFormat
import akka.projection.testing.simulation.SimulationSettings
Expand Down Expand Up @@ -178,7 +177,8 @@ class TestRoutes(
complete(StatusCodes.Accepted)
}
}
})
},
handle(PublishEvents.eventProducerService(system)))
}

}
11 changes: 10 additions & 1 deletion src/main/scala/akka/projection/testing/TestSetup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.projection.testing

import scala.concurrent.Future

import akka.Done
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
Expand All @@ -14,8 +13,10 @@ import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import akka.projection.ProjectionBehavior
import akka.projection.grpc.consumer.javadsl.GrpcReadJournal
import akka.projection.testing.cassandra.CassandraTestSetup
import akka.projection.testing.dynamodb.DynamoDBTestSetup
import akka.projection.testing.grpc.GrpcTestSetup
import akka.projection.testing.jdbc.JdbcTestSetup
import akka.projection.testing.r2dbc.R2dbcTestSetup

Expand Down Expand Up @@ -45,8 +46,16 @@ object TestSetup {
override def readJournal: String = CassandraReadJournal.Identifier
}

case object GRPC extends Journal {
override val journalPluginId: String = "akka.persistence.r2dbc.journal"
override def readJournal: String = GrpcReadJournal.Identifier
}

def apply(settings: EventProcessorSettings)(implicit system: ActorSystem[_]): TestSetup = {
system.settings.config.getString("akka.persistence.journal.plugin") match {
// same as R2DBC so we need to check an additional config key
case TestSetup.GRPC.journalPluginId if GrpcTestSetup.isGrpcRun(system) =>
new GrpcTestSetup(settings)
case TestSetup.R2DBC.journalPluginId => new R2dbcTestSetup(settings)
case TestSetup.DynamoDB.journalPluginId => new DynamoDBTestSetup(settings)
case TestSetup.JDBC.journalPluginId => new JdbcTestSetup(settings)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2020 - 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.testing.grpc

import akka.Done
import akka.actor.typed.ActorSystem
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.projection.ProjectionId
import akka.projection.r2dbc.scaladsl.R2dbcSession
import akka.projection.scaladsl.Handler
import akka.projection.testing.ConfigurablePersistentActor

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

class GrpcProjectionHandler(
val projectionId: ProjectionId,
projectionIndex: Int,
readOnly: Boolean,
val failEvery: Int,
executor: R2dbcExecutor)
extends Handler[EventEnvelope[ConfigurablePersistentActor.Event]] {

override def process(envelope: EventEnvelope[ConfigurablePersistentActor.Event]): Future[Done] =
if (readOnly)
Future.successful(Done)
else {
executor
.updateOne("insert") { connection =>
connection
.createStatement(sql"""
INSERT INTO events(name, projection_id, event)
VALUES (?, ?, ?)
ON CONFLICT (name, projection_id, event)
DO UPDATE SET updates = events.updates + 1;
""").bind(0, envelope.event.testName)
.bind(1, projectionIndex)
.bind(2, envelope.event.payload)
}
.map(_ => Done)(ExecutionContext.parasitic)
}
}
154 changes: 154 additions & 0 deletions src/main/scala/akka/projection/testing/grpc/GrpcTestSetup.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright (C) 2020 - 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.testing.grpc

import akka.Done
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MemberStatus.Up
import akka.cluster.typed.Cluster
import akka.grpc.GrpcClientSettings
import akka.persistence.query.Offset
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.r2dbc.ConnectionFactoryProvider
import akka.persistence.r2dbc.internal.R2dbcExecutor
import akka.persistence.r2dbc.internal.Sql.Interpolation
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.GrpcQuerySettings
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.scaladsl.SourceProvider
import akka.projection.testing.ConfigurablePersistentActor
import akka.projection.testing.EventProcessorSettings
import akka.projection.testing.TestSetup
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.Random

/**
* Uses R2DBC for actual DB but gRPC for the projection
*/
object GrpcTestSetup {
def isGrpcRun(system: ActorSystem[_]): Boolean =
system.settings.config.getString("akka.projection.grpc.consumer.stream-id") != ""
}

class GrpcTestSetup(settings: EventProcessorSettings)(implicit system: ActorSystem[_]) extends TestSetup {
override val journal: TestSetup.Journal = TestSetup.GRPC
private val log = LoggerFactory.getLogger(this.getClass)

private val executor = new R2dbcExecutor(
ConnectionFactoryProvider(system).connectionFactoryFor("akka.persistence.r2dbc.connection-factory"),
log,
logDbCallsExceeding = 5.seconds,
closeCallsExceeding = None)(system.executionContext, system)

override def init(): Future[Done] =
Future.successful(Done)

override def reset(): Future[Done] = {
executor
.executeDdls(s"reset") { connection =>
Vector(connection.createStatement("TRUNCATE events"))
}
.map(_ => Done)(ExecutionContext.parasitic)
}

override def createProjection(projectionIndex: Int, sliceIndex: Int): Behavior[ProjectionBehavior.Command] = {

val grpcQuerySettings = GrpcQuerySettings(PublishEvents.streamId)

val portFromConfig = system.settings.config.getInt("test.http.port")
val (grpcHost: String, grpcPort: Int) =
if (Cluster(system).state.members.size == 1) (Cluster(system).selfMember.address.host.get, portFromConfig)
else if (!Cluster(system).selfMember.address.host.contains("127.0.0.1")) {
// poor man's way to detect if we are non-local/k8 - canonical is not localhost

val dnsName = system.settings.config.getString("test.service-dns-name")
if (dnsName != "") {
log.info("Using configured service dns entry for gRPC projection [{}:{}]", dnsName, portFromConfig)
// configured dns name to go via
(dnsName, portFromConfig)
} else {
// random other cluster node - directly to canonical ip
val hostsAndPorts = Cluster(system).state.members
.filter(_.status == Up)
.filterNot(_ == Cluster(system).selfMember)
.map(member => member.address.host.get -> portFromConfig)
val hostAndPort @ (host, port) = Random.shuffle(hostsAndPorts).head
log.info("Using random node ip for gRPC projection [{}:{}]", host, port)
hostAndPort
}
} else {
// random other cluster node
val hostsAndPorts = Cluster(system).state.members
.filter(_.status == Up)
.filterNot(_.address.port.contains(portFromConfig))
.map(member => member.address.host.get -> ("80" + member.address.port.get.toString.takeRight(2)).toInt)
val hostAndPort @ (host, port) = Random.shuffle(hostsAndPorts).head
log.info("Using random local port for gRPC projection [{}:{}]", host, port)
hostAndPort
}

val grpcClientSettings =
GrpcClientSettings.connectToServiceAt(grpcHost, grpcPort).withTls(false)

val eventsBySlicesQuery = GrpcReadJournal(grpcQuerySettings, grpcClientSettings, Nil)

val ranges = EventSourcedProvider.sliceRanges(system, journal.readJournal, settings.parallelism)
val sliceRange = ranges(sliceIndex)

val projectionId = ProjectionId(s"test-projection-id-$projectionIndex", s"${sliceRange.min}-${sliceRange.max}")

val sourceProvider: SourceProvider[Offset, EventEnvelope[ConfigurablePersistentActor.Event]] = EventSourcedProvider
.eventsBySlices(system, eventsBySlicesQuery, eventsBySlicesQuery.streamId, sliceRange.min, sliceRange.max)

ProjectionBehavior(
R2dbcProjection.atLeastOnceAsync(
projectionId,
None,
sourceProvider,
() =>
new GrpcProjectionHandler(projectionId, projectionIndex, settings.readOnly, settings.failEvery, executor)))
}

override def countEvents(testName: String, projectionId: Int): Future[Int] = {
executor
.selectOne("count events")(
_.createStatement(sql"SELECT count(*) FROM events WHERE name = ? AND projection_id = ?")
.bind(0, testName)
.bind(1, projectionId),
{ row =>
row.get(0, classOf[Integer]).intValue
})
.map(_.getOrElse(0))(ExecutionContext.parasitic)
}

override def writeResult(testName: String, result: String): Future[Done] = {
executor
.updateOne("write result")(
_.createStatement(sql"INSERT INTO results(name, result) VALUES (?, ?)").bind(0, testName).bind(1, result))
.map(_ => Done)(ExecutionContext.parasitic)
}

override def readResult(testName: String): Future[String] = {
executor
.selectOne("read result")(
_.createStatement(sql"SELECT result FROM results WHERE name = ?")
.bind(0, testName),
{ row =>
row.get(0, classOf[String])
})
.map(_.getOrElse("not finished"))(ExecutionContext.parasitic)
}

override def cleanUp(): Unit = ()
}
Loading
Loading