Skip to content

Commit 3c19acb

Browse files
committed
docs: improve EventStream doc
1 parent adda9b4 commit 3c19acb

File tree

3 files changed

+107
-34
lines changed

3 files changed

+107
-34
lines changed

akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java

Lines changed: 61 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,66 +4,78 @@
44

55
package akka.actor.typed.eventstream;
66

7-
// #imports
7+
import static akka.actor.typed.javadsl.Adapter.spawn;
8+
import static akka.actor.typed.javadsl.Adapter.toClassic;
9+
10+
import akka.actor.Actor;
811
import akka.actor.AllDeadLetters;
912
import akka.actor.SuppressedDeadLetter;
13+
import akka.actor.Terminated;
1014
import akka.actor.testkit.typed.javadsl.ActorTestKit;
1115
import akka.actor.testkit.typed.javadsl.TestProbe;
1216
import akka.actor.typed.Behavior;
1317
import akka.actor.typed.Props;
1418
import akka.actor.typed.SpawnProtocol;
1519
import akka.actor.typed.SpawnProtocol.Spawn;
1620
import akka.actor.typed.eventstream.EventStream.Publish;
17-
import akka.actor.typed.eventstream.EventStream.Subscribe;
18-
import akka.actor.typed.javadsl.AbstractBehavior;
19-
import akka.actor.typed.javadsl.ActorContext;
2021
import akka.actor.typed.javadsl.AskPattern;
21-
import akka.actor.typed.javadsl.Behaviors;
22-
import akka.actor.typed.javadsl.Receive;
23-
import akka.testkit.javadsl.TestKit;
2422
import java.time.Duration;
2523
import java.util.concurrent.CompletionStage;
24+
import org.junit.Assert;
2625
import org.junit.Test;
2726
import org.scalatestplus.junit.JUnitSuite;
2827
// #imports-deadletter
2928
import akka.actor.DeadLetter;
3029
import akka.actor.typed.ActorRef;
3130
import akka.actor.typed.ActorSystem;
31+
import akka.actor.typed.eventstream.EventStream.Subscribe;
32+
import akka.actor.typed.javadsl.AbstractBehavior;
33+
import akka.actor.typed.javadsl.ActorContext;
34+
import akka.actor.typed.javadsl.Behaviors;
35+
import akka.actor.typed.javadsl.Receive;
36+
import org.slf4j.Logger;
3237
// #imports-deadletter
3338

3439
public class LoggingDocTest extends JUnitSuite {
3540

3641
@Test
3742
public void subscribeToDeadLetters() {
38-
// #deadletters
39-
ActorSystem<DeadLetter> system = ActorSystem.create(Behaviors.empty(), "DeadLetters");
40-
system.eventStream().tell(new Subscribe<>(DeadLetter.class, system));
41-
// #deadletters
43+
ActorSystem<SpawnProtocol.Command> system = ActorSystem.create(SpawnProtocol.create(),
44+
"DeadLettersSystem");
45+
// #subscribe-deadletter
46+
ActorRef<DeadLetter> deadLetters = spawn(toClassic(system), DeadLetterActor.create(),
47+
"DeadLetters");
48+
system.eventStream().tell(new Subscribe<>(DeadLetter.class, deadLetters));
49+
// #subscribe-deadletter
4250
ActorTestKit.shutdown(system);
4351
}
4452

4553
public
4654
// #deadletter-actor
47-
static class DeadLetterActor extends AbstractBehavior<String> {
55+
static class DeadLetterActor extends AbstractBehavior<DeadLetter> {
56+
57+
final Logger log = getContext().getLog();
4858

49-
public static Behavior<String> create() {
59+
public static Behavior<DeadLetter> create() {
5060
return Behaviors.setup(DeadLetterActor::new);
5161
}
5262

53-
public DeadLetterActor(ActorContext<String> context) {
63+
public DeadLetterActor(ActorContext<DeadLetter> context) {
5464
super(context);
5565
ActorRef<DeadLetter> messageAdapter = context.messageAdapter(
5666
DeadLetter.class,
57-
d -> d.message().toString()
67+
d -> d
5868
);
69+
// subscribe DeadLetter at start up.
5970
context.getSystem().eventStream()
6071
.tell(new Subscribe<>(DeadLetter.class, messageAdapter));
6172
}
6273

6374
@Override
64-
public Receive<String> createReceive() {
65-
return newReceiveBuilder().onMessage(String.class, msg -> {
66-
System.out.println(msg);
75+
public Receive<DeadLetter> createReceive() {
76+
return newReceiveBuilder().onMessage(DeadLetter.class, msg -> {
77+
log.info("receive dead letter: {} from <{}> to <{}>", msg, msg.sender(),
78+
msg.recipient());
6779
return Behaviors.same();
6880
}).build();
6981
}
@@ -95,6 +107,8 @@ public Electronic(String artist) {
95107

96108
static class Listener extends AbstractBehavior<AllKindsOfMusic> {
97109

110+
final Logger log = getContext().getLog();
111+
98112
public static Behavior<AllKindsOfMusic> create() {
99113
return Behaviors.setup(Listener::new);
100114
}
@@ -108,15 +122,13 @@ public Listener(ActorContext<AllKindsOfMusic> context) {
108122
public Receive<AllKindsOfMusic> createReceive() {
109123
return newReceiveBuilder()
110124
.onMessage(Jazz.class, msg -> {
111-
System.out.printf("%s is listening to: %s%n",
112-
getContext().getSelf().path().name(),
125+
log.info("{} is listening to Jazz: {}", getContext().getSelf().path().name(),
113126
msg);
114127
return Behaviors.same();
115128
})
116129
.onMessage(Electronic.class, msg -> {
117-
System.out.printf("%s is listening to: %s%n",
118-
getContext().getSelf().path().name(),
119-
msg);
130+
log.info("{} is listening to Electronic: {}",
131+
getContext().getSelf().path().name(), msg);
120132
return Behaviors.same();
121133
}).build();
122134
}
@@ -168,6 +180,15 @@ public void subscribeToSuppressedDeadLetters() {
168180
// #suppressed-deadletters
169181
system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, actor));
170182
// #suppressed-deadletters
183+
Terminated suppression = Terminated.apply(Actor.noSender(), false, false);
184+
SuppressedDeadLetter deadLetter = SuppressedDeadLetter.apply(suppression, Actor.noSender(),
185+
Actor.noSender());
186+
system.eventStream().tell(new Publish<>(deadLetter));
187+
188+
SuppressedDeadLetter suppressedDeadLetter = probe.expectMessageClass(
189+
SuppressedDeadLetter.class);
190+
Assert.assertNotNull(suppressedDeadLetter);
191+
Assert.assertNotEquals(suppression, suppressedDeadLetter.message());
171192

172193
ActorTestKit.shutdown(system);
173194
}
@@ -181,6 +202,23 @@ public void subscribeToAllDeadLetters() {
181202
system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, actor));
182203
// #all-deadletters
183204

205+
Terminated suppression = Terminated.apply(Actor.noSender(), false, false);
206+
SuppressedDeadLetter suppressedDeadLetter = SuppressedDeadLetter.apply(suppression,
207+
Actor.noSender(),
208+
Actor.noSender());
209+
system.eventStream().tell(new Publish<>(suppressedDeadLetter));
210+
DeadLetter deadLetter = DeadLetter.apply("deadLetter", Actor.noSender(), Actor.noSender());
211+
system.eventStream().tell(new Publish<>(deadLetter));
212+
213+
// both of the following messages will be received by the subscription actor
214+
SuppressedDeadLetter receiveSuppressed = probe.expectMessageClass(
215+
SuppressedDeadLetter.class);
216+
Assert.assertNotNull(receiveSuppressed);
217+
Assert.assertNotEquals(suppression, receiveSuppressed.message());
218+
DeadLetter receiveDeadLetter = probe.expectMessageClass(DeadLetter.class);
219+
Assert.assertNotNull(receiveDeadLetter);
220+
Assert.assertNotEquals(deadLetter.message(), receiveDeadLetter.message());
221+
184222
ActorTestKit.shutdown(system);
185223
}
186224
}

akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package akka.actor.typed.eventstream
66

7+
import akka.actor.Actor
78
import akka.actor.DeadLetter
9+
import akka.actor.Terminated
810
import akka.actor.testkit.typed.scaladsl.LogCapturing
911
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
1012
import akka.actor.testkit.typed.scaladsl.TestProbe
@@ -30,14 +32,18 @@ object LoggingDocSpec {
3032

3133
object DeadLetterListener {
3234

33-
def apply(): Behavior[String] = Behaviors.setup { context =>
34-
// subscribe DeadLetter at startup.
35-
val adapter = context.messageAdapter[DeadLetter](d => d.message.toString)
35+
def apply(): Behavior[DeadLetter] = Behaviors.setup { context =>
36+
// subscribe DeadLetter at start up.
37+
val adapter = context.messageAdapter[DeadLetter](d => d)
3638
context.system.eventStream ! Subscribe(adapter)
3739

3840
Behaviors.receiveMessage {
39-
case msg: String =>
40-
println(msg)
41+
case deadLetter: DeadLetter =>
42+
context.log.info(
43+
"receive dead letter: {} from <{}> to <{}>",
44+
deadLetter.message,
45+
deadLetter.sender,
46+
deadLetter.recipient)
4147
Behaviors.same
4248
}
4349
}
@@ -71,15 +77,25 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
7177
import LoggingDocSpec._
7278
import akka.actor.typed.scaladsl.AskPattern._
7379

74-
"allow registration to dead letters" in {
80+
"allow registration to dead letters from start up" in {
7581
// #deadletters
76-
ActorSystem(Behaviors.setup[Void] { context =>
82+
val system = ActorSystem(Behaviors.setup[Void] { context =>
7783
context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty)
7884
Behaviors.empty
7985
}, "System")
8086
// #deadletters
8187
}
8288

89+
"allow registration to dead letters" in {
90+
// #subscribe-deadletter
91+
ActorSystem(Behaviors.setup[Void] { context =>
92+
val deadLetterListener = context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty)
93+
context.system.eventStream ! Subscribe[DeadLetter](deadLetterListener)
94+
Behaviors.empty
95+
}, "System")
96+
// #subscribe-deadletter
97+
}
98+
8399
"demonstrate superclass subscriptions on typed eventStream" in {
84100
import LoggingDocSpec.ListenerActor._
85101
//#superclass-subscription-eventstream
@@ -106,17 +122,27 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
106122
}
107123

108124
"allow registration to suppressed dead letters" in {
109-
val listener: ActorRef[Any] = TestProbe().ref
125+
val probe: TestProbe[Any] = TestProbe()
126+
val listener: ActorRef[Any] = probe.ref
110127

111128
//#suppressed-deadletters
112129
import akka.actor.SuppressedDeadLetter
113130
system.eventStream ! Subscribe[SuppressedDeadLetter](listener)
114131
//#suppressed-deadletters
132+
val suppression = Terminated(Actor.noSender)(existenceConfirmed = false, addressTerminated = false)
133+
val suppressionDeadLetter = SuppressedDeadLetter(suppression, Actor.noSender, Actor.noSender)
134+
system.eventStream ! Publish(suppressionDeadLetter)
135+
136+
val receivedSuppression = probe.expectMessageType[SuppressedDeadLetter]
137+
receivedSuppression should equal(suppressionDeadLetter)
115138

116139
//#all-deadletters
117140
import akka.actor.AllDeadLetters
118141
system.eventStream ! Subscribe[AllDeadLetters](listener)
119142
//#all-deadletters
143+
val deadLetter = DeadLetter("deadLetter", Actor.noSender, Actor.noSender)
144+
val receivedDeadLetter = probe.expectMessageType[DeadLetter]
145+
receivedDeadLetter should equal(deadLetter)
120146
}
121147

122148
}

akka-docs/src/main/paradox/typed/event-stream.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ It uses @ref:[Subchannel Classification](#subchannel-classification) which enabl
3636

3737
## How to use
3838

39-
The following example demonstrates how a subscription works. Given an actor:
39+
The following example demonstrates how a subscription works. Given an actor will subscribe DeadLetter from start up:
4040

4141
Scala
4242
: @@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #deadletters }
@@ -45,15 +45,24 @@ Java
4545
: @@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #imports-deadletter }
4646

4747

48+
@@@ div { .group-scala }
49+
50+
Or you can also subscribe after Actor starts:
51+
52+
@@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #subscribe-deadletter }
53+
54+
@@@
55+
56+
4857
@@@ div { .group-java }
4958

5059
the actor definition like this:
5160

5261
@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletter-actor }
5362

54-
it can be subscribed like this:
63+
Or you can also subscribe after Actor starts:
5564

56-
@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletters }
65+
@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #subscribe-deadletter }
5766

5867
@@@
5968

0 commit comments

Comments
 (0)