Skip to content

Commit 8cb16de

Browse files
author
Zen Yui
authored
Issue-111: add event and snapshot adapters (#116)
Converts lagom-pb wrapper protos to COS protos at rest (as part of decoupling us from lagom-pb)
1 parent f91520a commit 8cb16de

File tree

10 files changed

+336
-5
lines changed

10 files changed

+336
-5
lines changed

code/service/src/main/scala/com/namely/chiefofstate/Aggregate.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ import com.google.protobuf.empty.Empty
55
import com.typesafe.config.Config
66
import io.superflat.lagompb.encryption.EncryptionAdapter
77
import io.superflat.lagompb.{AggregateRoot, CommandHandler, EventHandler}
8+
import akka.persistence.typed.PersistenceId
9+
import akka.persistence.typed.scaladsl.EventSourcedBehavior
10+
import io.superflat.lagompb.Command
11+
import io.superflat.lagompb.protobuf.v1.core.{EventWrapper, StateWrapper}
12+
import com.namely.chiefofstate.persistence.{CosEventAdapter, CosSnapshotAdapter}
813

914
/**
1015
* ChiefOfStateAggregate
@@ -24,4 +29,17 @@ class Aggregate(
2429

2530
override def aggregateName: String = "chiefOfState"
2631

32+
/**
33+
* generate the lagom-pb event-sourced behavior and inject event and
34+
* snapshot adapters
35+
*
36+
* @param persistenceId aggregate persistence ID
37+
* @return EventSourcedBehavior
38+
*/
39+
override def create(persistenceId: PersistenceId): EventSourcedBehavior[Command, EventWrapper, StateWrapper] = {
40+
super
41+
.create(persistenceId)
42+
.eventAdapter(CosEventAdapter)
43+
.snapshotAdapter(CosSnapshotAdapter)
44+
}
2745
}

code/service/src/main/scala/com/namely/chiefofstate/Util.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,28 @@ object Util {
2222
/**
2323
* Converts the lagom-pb MetaData class to the chief-of-state MetaData
2424
*
25-
* @param metaData
26-
* @return
25+
* @param metaData lagom-pb MetaData
26+
* @return chief-of-state MetaData instance
2727
*/
2828
def toCosMetaData(metaData: MetaData): CosMetaData = {
2929
CosMetaData(
3030
entityId = metaData.entityId,
31-
// TODO: remove .toInt
32-
revisionNumber = metaData.revisionNumber.toInt,
31+
revisionNumber = metaData.revisionNumber,
32+
revisionDate = metaData.revisionDate,
33+
data = metaData.data
34+
)
35+
}
36+
37+
/**
38+
* Converts chief-of-state MetaData to lagom-pb MetaData
39+
*
40+
* @param metaData COS meta data
41+
* @return Lagom-pb MetaData instance
42+
*/
43+
def toLagompbMetaData(metaData: CosMetaData): MetaData = {
44+
MetaData(
45+
entityId = metaData.entityId,
46+
revisionNumber = metaData.revisionNumber,
3347
revisionDate = metaData.revisionDate,
3448
data = metaData.data
3549
)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package com.namely.chiefofstate.persistence
2+
3+
import akka.persistence.typed.{EventAdapter, EventSeq}
4+
import io.superflat.lagompb.protobuf.v1.core.{EventWrapper => LagompbEventWrapper}
5+
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper
6+
import com.namely.chiefofstate.Util
7+
8+
/**
9+
* Akka persistence event adaptor that converts lagom-pb event wrappers
10+
* to COS wrappers in preparation of dropping the lagom-pb dependency
11+
*/
12+
object CosEventAdapter extends EventAdapter[LagompbEventWrapper, EventWrapper] {
13+
14+
/**
15+
* convert lagom-pb EventWrapper to a cos EventWrapper
16+
*
17+
* @param e lagom-pb EventWrapper
18+
* @return cos EventWrapper instance
19+
*/
20+
def toJournal(e: LagompbEventWrapper): EventWrapper = {
21+
EventWrapper(
22+
event = e.event,
23+
resultingState = e.resultingState,
24+
meta = e.meta.map(Util.toCosMetaData)
25+
)
26+
}
27+
28+
/**
29+
* convert cos EventWrapper to a lagom-pb EventWrapper
30+
*
31+
* @param p cos EventWrapper
32+
* @param manifest the manifest used
33+
* @return lagom-pb EventWrapper instance
34+
*/
35+
def fromJournal(p: EventWrapper, manifest: String): EventSeq[LagompbEventWrapper] = {
36+
EventSeq(
37+
Seq(
38+
LagompbEventWrapper(
39+
event = p.event,
40+
resultingState = p.resultingState,
41+
meta = p.meta.map(Util.toLagompbMetaData)
42+
)
43+
)
44+
)
45+
}
46+
47+
val MANIFEST: String = "com.namely.chiefofstate.persistence.CosEventAdapter"
48+
49+
def manifest(event: LagompbEventWrapper): String = MANIFEST
50+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.namely.chiefofstate.persistence
2+
3+
import io.superflat.lagompb.protobuf.v1.core.{StateWrapper => LagompbStateWrapper}
4+
import com.namely.protobuf.chiefofstate.v1.persistence.StateWrapper
5+
import akka.persistence.typed.SnapshotAdapter
6+
import com.google.protobuf.any.Any
7+
import com.namely.chiefofstate.Util
8+
9+
/**
10+
* akka persistence SnapshotAdapter for converting to/from lagom-pb
11+
* state wrappers in anticipation of dropping that dependency
12+
*/
13+
object CosSnapshotAdapter extends SnapshotAdapter[LagompbStateWrapper] {
14+
15+
/**
16+
* convert lagom-pb state wrapper to a COS wrapper
17+
*
18+
* @param state lagom-pb state wrapper
19+
* @return COS state wrapper as a scala.Any
20+
*/
21+
def toJournal(state: LagompbStateWrapper): scala.Any = {
22+
StateWrapper(
23+
state = state.state,
24+
meta = state.meta.map(Util.toCosMetaData)
25+
)
26+
}
27+
28+
/**
29+
* convert COS state wrapper to a lagom-pb state wrapper
30+
*
31+
* @param from COS state wrapper as a scala.Any
32+
* @return lagom-pb StateWrapper
33+
*/
34+
def fromJournal(from: scala.Any): LagompbStateWrapper = {
35+
from match {
36+
case state: StateWrapper =>
37+
LagompbStateWrapper(
38+
state = state.state,
39+
meta = state.meta.map(Util.toLagompbMetaData)
40+
)
41+
42+
case x =>
43+
throw new Exception(s"snapshot adapter cannot unpack state of type ${from.getClass.getName}")
44+
}
45+
}
46+
47+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.namely.chiefofstate
2+
3+
import org.scalamock.scalatest.MockFactory
4+
import scala.util.Try
5+
import io.superflat.lagompb.encryption.EncryptionAdapter
6+
import akka.persistence.typed.PersistenceId
7+
import com.namely.chiefofstate.test.helpers.TestSpec
8+
import io.superflat.lagompb.{CommandHandler, EventHandler}
9+
10+
class AggregateSpec extends TestSpec with MockFactory {
11+
".create" should {
12+
"return EventSourcedBehavior with adapters" in {
13+
val cmdHandler: CommandHandler = mock[CommandHandler]
14+
val eventHandler: EventHandler = mock[EventHandler]
15+
val encryptionAdapter: EncryptionAdapter = mock[EncryptionAdapter]
16+
val agg = new Aggregate(null, null, cmdHandler, eventHandler, encryptionAdapter)
17+
val persistenceId: PersistenceId = PersistenceId("typeHint", "entityId")
18+
// TODO: find a real way to test this
19+
// unfortunately, Akka made the implementation case class private,
20+
// so there is no way to observe the eventAdatper and snapshotAdapter
21+
Try(agg.create(persistenceId)).isSuccess shouldBe (true)
22+
}
23+
}
24+
}

code/service/src/test/scala/com/namely/chiefofstate/UtilSpec.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,26 @@ class UtilSpec extends TestSpec {
3030
actual shouldBe (expected)
3131
}
3232
}
33+
34+
"toLagompbMetaData" should {
35+
"return the right lagom-pb MetaData" in {
36+
val ts = Timestamp().withSeconds(3L).withNanos(2)
37+
val revisionNumber = 2
38+
val data = Map("foo" -> Any.pack(Empty.defaultInstance))
39+
40+
val lagomMetaData = LagompbMetaData()
41+
.withRevisionNumber(revisionNumber)
42+
.withRevisionDate(ts)
43+
.withData(data)
44+
45+
val cosMetaData = CosMetaData()
46+
.withRevisionNumber(revisionNumber)
47+
.withRevisionDate(ts)
48+
.withData(data)
49+
50+
val actual = Util.toLagompbMetaData(cosMetaData)
51+
52+
actual shouldBe (lagomMetaData)
53+
}
54+
}
3355
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.namely.chiefofstate.persistence
2+
3+
import com.google.protobuf.any.Any
4+
import com.google.protobuf.wrappers.StringValue
5+
import com.namely.chiefofstate.test.helpers.TestSpec
6+
import com.namely.protobuf.chiefofstate.v1.common.{MetaData => CosMetaData}
7+
import com.namely.protobuf.chiefofstate.v1.persistence.EventWrapper
8+
import io.superflat.lagompb.protobuf.v1.core.{MetaData => LagompbMetaData}
9+
import io.superflat.lagompb.protobuf.v1.core.{EventWrapper => LagompbEventWrapper}
10+
11+
class CosEventAdapterSpec extends TestSpec {
12+
".toJournal" should {
13+
"return a cos event wrapper" in {
14+
val event = Any.pack(StringValue("event"))
15+
val state = Any.pack(StringValue("state"))
16+
val revision = 2
17+
18+
val before = LagompbEventWrapper()
19+
.withEvent(event)
20+
.withResultingState(state)
21+
.withMeta(LagompbMetaData().withRevisionNumber(revision))
22+
23+
val expected = EventWrapper()
24+
.withEvent(event)
25+
.withResultingState(state)
26+
.withMeta(CosMetaData().withRevisionNumber(revision))
27+
28+
val actual = CosEventAdapter.toJournal(before)
29+
30+
actual shouldBe (expected)
31+
}
32+
}
33+
34+
".fromJournal" should {
35+
"return a lagom-pb event wrapper" in {
36+
val event = Any.pack(StringValue("event"))
37+
val state = Any.pack(StringValue("state"))
38+
val revision = 2
39+
40+
val expected = LagompbEventWrapper()
41+
.withEvent(event)
42+
.withResultingState(state)
43+
.withMeta(LagompbMetaData().withRevisionNumber(revision))
44+
45+
val before = EventWrapper()
46+
.withEvent(event)
47+
.withResultingState(state)
48+
.withMeta(CosMetaData().withRevisionNumber(revision))
49+
50+
val actual = CosEventAdapter.fromJournal(before, "")
51+
actual.events.length shouldBe (1)
52+
actual.events.head shouldBe (expected)
53+
}
54+
}
55+
56+
".manifest" should {
57+
"yield the stable string" in {
58+
CosEventAdapter.manifest(null) shouldBe (CosEventAdapter.MANIFEST)
59+
}
60+
}
61+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.namely.chiefofstate.persistence
2+
3+
import com.google.protobuf.any.Any
4+
import com.google.protobuf.wrappers.StringValue
5+
import com.google.protobuf.timestamp.Timestamp
6+
import com.namely.chiefofstate.test.helpers.TestSpec
7+
import com.namely.chiefofstate.Util
8+
import com.namely.protobuf.chiefofstate.v1.common.{MetaData => CosMetaData}
9+
import com.namely.protobuf.chiefofstate.v1.persistence.StateWrapper
10+
import io.superflat.lagompb.protobuf.v1.core.{MetaData => LagompbMetaData}
11+
import io.superflat.lagompb.protobuf.v1.core.{StateWrapper => LagompbStateWrapper}
12+
13+
class CosSnapshotAdapterSpec extends TestSpec {
14+
".toJournal" should {
15+
"return a cos state wrapper" in {
16+
val state = Any.pack(StringValue("state"))
17+
val revision: Int = 2
18+
19+
val before = LagompbStateWrapper()
20+
.withState(state)
21+
.withMeta(LagompbMetaData().withRevisionNumber(revision))
22+
23+
val expected = StateWrapper()
24+
.withState(state)
25+
.withMeta(Util.toCosMetaData(before.getMeta))
26+
27+
val actual: StateWrapper = CosSnapshotAdapter
28+
.toJournal(before)
29+
.asInstanceOf[StateWrapper]
30+
31+
actual shouldBe (expected)
32+
}
33+
}
34+
35+
".fromJournal" should {
36+
"return a lagom-pb state wrapper" in {
37+
val state = Any.pack(StringValue("state"))
38+
val revision: Int = 2
39+
40+
val before = StateWrapper()
41+
.withState(state)
42+
.withMeta(CosMetaData().withRevisionNumber(revision))
43+
44+
val expected = LagompbStateWrapper()
45+
.withState(state)
46+
.withMeta(Util.toLagompbMetaData(before.getMeta))
47+
48+
val actual: LagompbStateWrapper = CosSnapshotAdapter
49+
.fromJournal(before)
50+
.asInstanceOf[LagompbStateWrapper]
51+
52+
actual shouldBe (expected)
53+
}
54+
"fail on unknown snapshot type" in {
55+
val failure = intercept[Exception] {
56+
CosSnapshotAdapter.fromJournal(StringValue("bad state"))
57+
}
58+
59+
failure.getMessage().startsWith("snapshot adapter cannot unpack state of type") shouldBe (true)
60+
}
61+
}
62+
}

project/Dependencies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ object Dependencies {
55
// Package versions
66
object Versions {
77
val Scala213: String = "2.13.1"
8-
val LagomPbVersion: String = "1.0.2"
8+
val LagomPbVersion: String = "1.0.2+4-ccd75b98-SNAPSHOT"
99
val KanelaAgentVersion: String = "1.0.6"
1010
val SilencerVersion: String = "1.6.0"
1111
val KamonAkkaGrpcVersion: String = "0.0.9"

proto/internal/persistence.proto

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
syntax = "proto3";
2+
3+
package chief_of_state.v1;
4+
5+
option java_package = "com.namely.protobuf.chiefofstate.v1";
6+
option java_multiple_files = true;
7+
option java_outer_classname = "CosPersistenceProto";
8+
9+
import "chief_of_state/v1/common.proto";
10+
import "google/protobuf/any.proto";
11+
12+
// These protos are used by akka persistence to write to disk. They will
13+
// likely be offered as part of the open-source protos repo, but until
14+
// they are concrete/finalized, we are keeping them here.
15+
16+
// Wrap the aggregate state and the meta data.
17+
message StateWrapper {
18+
// the entity state
19+
google.protobuf.Any state = 1;
20+
// metadata from the event that made this state
21+
chief_of_state.v1.MetaData meta = 3;
22+
}
23+
24+
// EventWrapper is an event wrapper that holds both the
25+
// event and the corresponding aggregate root state.
26+
message EventWrapper {
27+
// the event emitted
28+
google.protobuf.Any event = 1;
29+
// the state obtained from processing the event
30+
google.protobuf.Any resulting_state = 2;
31+
// meta data
32+
chief_of_state.v1.MetaData meta = 3;
33+
}

0 commit comments

Comments
 (0)