From e8be86e0afd23905e70875841a39877e364f368a Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Wed, 20 Mar 2024 11:47:55 +0800 Subject: [PATCH 1/6] docs: improve EventStream doc --- .../typed/eventstream/LoggingDocTest.java | 84 ++++++++++++++----- .../typed/eventstream/LoggingDocSpec.scala | 41 +++++++-- .../src/main/paradox/typed/event-stream.md | 15 +++- 3 files changed, 107 insertions(+), 33 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java index 6afdc1b1862..64b5c86c930 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java @@ -4,9 +4,13 @@ package akka.actor.typed.eventstream; -// #imports +import static akka.actor.typed.javadsl.Adapter.spawn; +import static akka.actor.typed.javadsl.Adapter.toClassic; + +import akka.actor.Actor; import akka.actor.AllDeadLetters; import akka.actor.SuppressedDeadLetter; +import akka.actor.Terminated; import akka.actor.testkit.typed.javadsl.ActorTestKit; import akka.actor.testkit.typed.javadsl.TestProbe; import akka.actor.typed.Behavior; @@ -14,56 +18,64 @@ import akka.actor.typed.SpawnProtocol; import akka.actor.typed.SpawnProtocol.Spawn; import akka.actor.typed.eventstream.EventStream.Publish; -import akka.actor.typed.eventstream.EventStream.Subscribe; -import akka.actor.typed.javadsl.AbstractBehavior; -import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.AskPattern; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.typed.javadsl.Receive; -import akka.testkit.javadsl.TestKit; import java.time.Duration; import java.util.concurrent.CompletionStage; +import org.junit.Assert; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; // #imports-deadletter import akka.actor.DeadLetter; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; +import akka.actor.typed.eventstream.EventStream.Subscribe; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; +import org.slf4j.Logger; // #imports-deadletter public class LoggingDocTest extends JUnitSuite { @Test public void subscribeToDeadLetters() { - // #deadletters - ActorSystem system = ActorSystem.create(Behaviors.empty(), "DeadLetters"); - system.eventStream().tell(new Subscribe<>(DeadLetter.class, system)); - // #deadletters + ActorSystem system = ActorSystem.create(SpawnProtocol.create(), + "DeadLettersSystem"); + // #subscribe-deadletter + ActorRef deadLetters = spawn(toClassic(system), DeadLetterActor.create(), + "DeadLetters"); + system.eventStream().tell(new Subscribe<>(DeadLetter.class, deadLetters)); + // #subscribe-deadletter ActorTestKit.shutdown(system); } public // #deadletter-actor - static class DeadLetterActor extends AbstractBehavior { + static class DeadLetterActor extends AbstractBehavior { + + final Logger log = getContext().getLog(); - public static Behavior create() { + public static Behavior create() { return Behaviors.setup(DeadLetterActor::new); } - public DeadLetterActor(ActorContext context) { + public DeadLetterActor(ActorContext context) { super(context); ActorRef messageAdapter = context.messageAdapter( DeadLetter.class, - d -> d.message().toString() + d -> d ); + // subscribe DeadLetter at start up. context.getSystem().eventStream() .tell(new Subscribe<>(DeadLetter.class, messageAdapter)); } @Override - public Receive createReceive() { - return newReceiveBuilder().onMessage(String.class, msg -> { - System.out.println(msg); + public Receive createReceive() { + return newReceiveBuilder().onMessage(DeadLetter.class, msg -> { + log.info("receive dead letter: {} from <{}> to <{}>", msg, msg.sender(), + msg.recipient()); return Behaviors.same(); }).build(); } @@ -95,6 +107,8 @@ public Electronic(String artist) { static class Listener extends AbstractBehavior { + final Logger log = getContext().getLog(); + public static Behavior create() { return Behaviors.setup(Listener::new); } @@ -108,15 +122,13 @@ public Listener(ActorContext context) { public Receive createReceive() { return newReceiveBuilder() .onMessage(Jazz.class, msg -> { - System.out.printf("%s is listening to: %s%n", - getContext().getSelf().path().name(), + log.info("{} is listening to Jazz: {}", getContext().getSelf().path().name(), msg); return Behaviors.same(); }) .onMessage(Electronic.class, msg -> { - System.out.printf("%s is listening to: %s%n", - getContext().getSelf().path().name(), - msg); + log.info("{} is listening to Electronic: {}", + getContext().getSelf().path().name(), msg); return Behaviors.same(); }).build(); } @@ -168,6 +180,15 @@ public void subscribeToSuppressedDeadLetters() { // #suppressed-deadletters system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, actor)); // #suppressed-deadletters + Terminated suppression = Terminated.apply(Actor.noSender(), false, false); + SuppressedDeadLetter deadLetter = SuppressedDeadLetter.apply(suppression, Actor.noSender(), + Actor.noSender()); + system.eventStream().tell(new Publish<>(deadLetter)); + + SuppressedDeadLetter suppressedDeadLetter = probe.expectMessageClass( + SuppressedDeadLetter.class); + Assert.assertNotNull(suppressedDeadLetter); + Assert.assertNotEquals(suppression, suppressedDeadLetter.message()); ActorTestKit.shutdown(system); } @@ -181,6 +202,23 @@ public void subscribeToAllDeadLetters() { system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, actor)); // #all-deadletters + Terminated suppression = Terminated.apply(Actor.noSender(), false, false); + SuppressedDeadLetter suppressedDeadLetter = SuppressedDeadLetter.apply(suppression, + Actor.noSender(), + Actor.noSender()); + system.eventStream().tell(new Publish<>(suppressedDeadLetter)); + DeadLetter deadLetter = DeadLetter.apply("deadLetter", Actor.noSender(), Actor.noSender()); + system.eventStream().tell(new Publish<>(deadLetter)); + + // both of the following messages will be received by the subscription actor + SuppressedDeadLetter receiveSuppressed = probe.expectMessageClass( + SuppressedDeadLetter.class); + Assert.assertNotNull(receiveSuppressed); + Assert.assertNotEquals(suppression, receiveSuppressed.message()); + DeadLetter receiveDeadLetter = probe.expectMessageClass(DeadLetter.class); + Assert.assertNotNull(receiveDeadLetter); + Assert.assertNotEquals(deadLetter.message(), receiveDeadLetter.message()); + ActorTestKit.shutdown(system); } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala index db093f6dc8e..7af2d5d9b75 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala @@ -4,7 +4,9 @@ package akka.actor.typed.eventstream +import akka.actor.Actor import akka.actor.DeadLetter +import akka.actor.Terminated import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe @@ -24,20 +26,25 @@ import scala.concurrent.Future object LoggingDocSpec { //#deadletters + import akka.actor.DeadLetter import akka.actor.typed.Behavior import akka.actor.typed.eventstream.EventStream.Subscribe import akka.actor.typed.scaladsl.Behaviors object DeadLetterListener { - def apply(): Behavior[String] = Behaviors.setup { context => - // subscribe DeadLetter at startup. - val adapter = context.messageAdapter[DeadLetter](d => d.message.toString) + def apply(): Behavior[DeadLetter] = Behaviors.setup { context => + // subscribe DeadLetter at start up. + val adapter = context.messageAdapter[DeadLetter](d => d) context.system.eventStream ! Subscribe(adapter) Behaviors.receiveMessage { - case msg: String => - println(msg) + case deadLetter: DeadLetter => + context.log.info( + "receive dead letter: {} from <{}> to <{}>", + deadLetter.message, + deadLetter.sender, + deadLetter.recipient) Behaviors.same } } @@ -71,7 +78,7 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with import LoggingDocSpec._ import akka.actor.typed.scaladsl.AskPattern._ - "allow registration to dead letters" in { + "allow registration to dead letters from start up" in { // #deadletters ActorSystem(Behaviors.setup[Void] { context => context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty) @@ -80,6 +87,16 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with // #deadletters } + "allow registration to dead letters" in { + // #subscribe-deadletter + ActorSystem(Behaviors.setup[Void] { context => + val deadLetterListener = context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty) + context.system.eventStream ! Subscribe[DeadLetter](deadLetterListener) + Behaviors.empty + }, "System") + // #subscribe-deadletter + } + "demonstrate superclass subscriptions on typed eventStream" in { import LoggingDocSpec.ListenerActor._ //#superclass-subscription-eventstream @@ -106,17 +123,27 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with } "allow registration to suppressed dead letters" in { - val listener: ActorRef[Any] = TestProbe().ref + val probe: TestProbe[Any] = TestProbe() + val listener: ActorRef[Any] = probe.ref //#suppressed-deadletters import akka.actor.SuppressedDeadLetter system.eventStream ! Subscribe[SuppressedDeadLetter](listener) //#suppressed-deadletters + val suppression = Terminated(Actor.noSender)(existenceConfirmed = false, addressTerminated = false) + val suppressionDeadLetter = SuppressedDeadLetter(suppression, Actor.noSender, Actor.noSender) + system.eventStream ! Publish(suppressionDeadLetter) + + val receivedSuppression = probe.expectMessageType[SuppressedDeadLetter] + receivedSuppression should equal(suppressionDeadLetter) //#all-deadletters import akka.actor.AllDeadLetters system.eventStream ! Subscribe[AllDeadLetters](listener) //#all-deadletters + val deadLetter = DeadLetter("deadLetter", Actor.noSender, Actor.noSender) + val receivedDeadLetter = probe.expectMessageType[DeadLetter] + receivedDeadLetter should equal(deadLetter) } } diff --git a/akka-docs/src/main/paradox/typed/event-stream.md b/akka-docs/src/main/paradox/typed/event-stream.md index 52635b1589d..501183e2a1a 100644 --- a/akka-docs/src/main/paradox/typed/event-stream.md +++ b/akka-docs/src/main/paradox/typed/event-stream.md @@ -36,7 +36,7 @@ It uses @ref:[Subchannel Classification](#subchannel-classification) which enabl ## How to use -The following example demonstrates how a subscription works. Given an actor: +The following example demonstrates how a subscription works. Given an actor will subscribe DeadLetter from start up: Scala : @@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #deadletters } @@ -45,15 +45,24 @@ Java : @@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #imports-deadletter } +@@@ div { .group-scala } + +Or you can also subscribe after Actor starts: + +@@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #subscribe-deadletter } + +@@@ + + @@@ div { .group-java } the actor definition like this: @@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletter-actor } -it can be subscribed like this: +Or you can also subscribe after Actor starts: -@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #deadletters } +@@snip [LoggingDocTest.java](/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java) { #subscribe-deadletter } @@@ From 86cd0424aa1ca6422ea1e361574e53477e4afb62 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 26 Mar 2024 17:08:52 +0800 Subject: [PATCH 2/6] faster compile matcher & not null deadLetter --- .../typed/eventstream/LoggingDocTest.java | 22 ++++++++++--------- .../typed/eventstream/LoggingDocSpec.scala | 13 ++++++----- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java index 64b5c86c930..bfd25efae60 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java @@ -18,6 +18,7 @@ import akka.actor.typed.SpawnProtocol; import akka.actor.typed.SpawnProtocol.Spawn; import akka.actor.typed.eventstream.EventStream.Publish; +import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.AskPattern; import java.time.Duration; import java.util.concurrent.CompletionStage; @@ -176,13 +177,13 @@ public void subscribeBySubclassification() { public void subscribeToSuppressedDeadLetters() { ActorSystem system = ActorSystem.create(Behaviors.empty(), "SuppressedDeadLetter"); TestProbe probe = TestProbe.create(system); - ActorRef actor = probe.ref(); + ActorRef listener = probe.ref(); + akka.actor.ActorRef mockRef = Adapter.toClassic(listener); // #suppressed-deadletters - system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, actor)); + system.eventStream().tell(new Subscribe<>(SuppressedDeadLetter.class, listener)); // #suppressed-deadletters - Terminated suppression = Terminated.apply(Actor.noSender(), false, false); - SuppressedDeadLetter deadLetter = SuppressedDeadLetter.apply(suppression, Actor.noSender(), - Actor.noSender()); + Terminated suppression = Terminated.apply(mockRef, false, false); + SuppressedDeadLetter deadLetter = SuppressedDeadLetter.apply(suppression, mockRef, mockRef); system.eventStream().tell(new Publish<>(deadLetter)); SuppressedDeadLetter suppressedDeadLetter = probe.expectMessageClass( @@ -197,17 +198,18 @@ public void subscribeToSuppressedDeadLetters() { public void subscribeToAllDeadLetters() { ActorSystem system = ActorSystem.create(Behaviors.empty(), "AllDeadLetters"); TestProbe probe = TestProbe.create(system); - ActorRef actor = probe.ref(); + ActorRef listener = probe.ref(); + akka.actor.ActorRef mockRef = Adapter.toClassic(listener); // #all-deadletters - system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, actor)); + system.eventStream().tell(new Subscribe<>(AllDeadLetters.class, listener)); // #all-deadletters Terminated suppression = Terminated.apply(Actor.noSender(), false, false); SuppressedDeadLetter suppressedDeadLetter = SuppressedDeadLetter.apply(suppression, - Actor.noSender(), - Actor.noSender()); + mockRef, + mockRef); system.eventStream().tell(new Publish<>(suppressedDeadLetter)); - DeadLetter deadLetter = DeadLetter.apply("deadLetter", Actor.noSender(), Actor.noSender()); + DeadLetter deadLetter = DeadLetter.apply("deadLetter", mockRef, mockRef); system.eventStream().tell(new Publish<>(deadLetter)); // both of the following messages will be received by the subscription actor diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala index 7af2d5d9b75..62d19b4e044 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala @@ -4,7 +4,6 @@ package akka.actor.typed.eventstream -import akka.actor.Actor import akka.actor.DeadLetter import akka.actor.Terminated import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -17,6 +16,7 @@ import akka.actor.typed.SpawnProtocol import akka.actor.typed.SpawnProtocol.Spawn import akka.actor.typed.eventstream.EventStream.Publish import akka.actor.typed.eventstream.EventStream.Subscribe +import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.Behaviors import org.scalatest.wordspec.AnyWordSpecLike @@ -125,25 +125,26 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with "allow registration to suppressed dead letters" in { val probe: TestProbe[Any] = TestProbe() val listener: ActorRef[Any] = probe.ref + val mockRef = listener.toClassic //#suppressed-deadletters import akka.actor.SuppressedDeadLetter system.eventStream ! Subscribe[SuppressedDeadLetter](listener) //#suppressed-deadletters - val suppression = Terminated(Actor.noSender)(existenceConfirmed = false, addressTerminated = false) - val suppressionDeadLetter = SuppressedDeadLetter(suppression, Actor.noSender, Actor.noSender) + val suppression = Terminated(mockRef)(existenceConfirmed = false, addressTerminated = false) + val suppressionDeadLetter = SuppressedDeadLetter(suppression, mockRef, mockRef) system.eventStream ! Publish(suppressionDeadLetter) val receivedSuppression = probe.expectMessageType[SuppressedDeadLetter] - receivedSuppression should equal(suppressionDeadLetter) + receivedSuppression shouldBe suppressionDeadLetter //#all-deadletters import akka.actor.AllDeadLetters system.eventStream ! Subscribe[AllDeadLetters](listener) //#all-deadletters - val deadLetter = DeadLetter("deadLetter", Actor.noSender, Actor.noSender) + val deadLetter = DeadLetter("deadLetter", mockRef, mockRef) val receivedDeadLetter = probe.expectMessageType[DeadLetter] - receivedDeadLetter should equal(deadLetter) + receivedDeadLetter shouldBe deadLetter } } From caf0af11d13adc4575a3f17cd4b8500a57ae5948 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 26 Mar 2024 17:40:55 +0800 Subject: [PATCH 3/6] chore unit test fixes --- .../akka/actor/typed/eventstream/LoggingDocTest.java | 12 +++++++----- .../actor/typed/eventstream/LoggingDocSpec.scala | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java index bfd25efae60..71ac228a61b 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java @@ -44,8 +44,10 @@ public void subscribeToDeadLetters() { ActorSystem system = ActorSystem.create(SpawnProtocol.create(), "DeadLettersSystem"); // #subscribe-deadletter - ActorRef deadLetters = spawn(toClassic(system), DeadLetterActor.create(), - "DeadLetters"); + CompletionStage> spawnActorFuture = AskPattern.ask(system, + r -> new Spawn<>(DeadLetterActor.create(), "DeadLetters", Props.empty(), r), + Duration.ofSeconds(3), system.scheduler()); + ActorRef deadLetters = spawnActorFuture.toCompletableFuture().join(); system.eventStream().tell(new Subscribe<>(DeadLetter.class, deadLetters)); // #subscribe-deadletter ActorTestKit.shutdown(system); @@ -189,7 +191,7 @@ public void subscribeToSuppressedDeadLetters() { SuppressedDeadLetter suppressedDeadLetter = probe.expectMessageClass( SuppressedDeadLetter.class); Assert.assertNotNull(suppressedDeadLetter); - Assert.assertNotEquals(suppression, suppressedDeadLetter.message()); + Assert.assertEquals(deadLetter, suppressedDeadLetter); ActorTestKit.shutdown(system); } @@ -216,10 +218,10 @@ public void subscribeToAllDeadLetters() { SuppressedDeadLetter receiveSuppressed = probe.expectMessageClass( SuppressedDeadLetter.class); Assert.assertNotNull(receiveSuppressed); - Assert.assertNotEquals(suppression, receiveSuppressed.message()); + Assert.assertEquals(suppressedDeadLetter, receiveSuppressed); DeadLetter receiveDeadLetter = probe.expectMessageClass(DeadLetter.class); Assert.assertNotNull(receiveDeadLetter); - Assert.assertNotEquals(deadLetter.message(), receiveDeadLetter.message()); + Assert.assertEquals(deadLetter, receiveDeadLetter); ActorTestKit.shutdown(system); } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala index 62d19b4e044..2c778937801 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala @@ -143,6 +143,7 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with system.eventStream ! Subscribe[AllDeadLetters](listener) //#all-deadletters val deadLetter = DeadLetter("deadLetter", mockRef, mockRef) + system.eventStream ! Publish(deadLetter) val receivedDeadLetter = probe.expectMessageType[DeadLetter] receivedDeadLetter shouldBe deadLetter } From 0bc4ead0c8d561afbe8d705381bee10df6e1b0aa Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Tue, 26 Mar 2024 17:47:31 +0800 Subject: [PATCH 4/6] remove unused imports --- .../actor/typed/eventstream/LoggingDocTest.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java index 71ac228a61b..844d51761ba 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java @@ -4,9 +4,6 @@ package akka.actor.typed.eventstream; -import static akka.actor.typed.javadsl.Adapter.spawn; -import static akka.actor.typed.javadsl.Adapter.toClassic; - import akka.actor.Actor; import akka.actor.AllDeadLetters; import akka.actor.SuppressedDeadLetter; @@ -44,11 +41,11 @@ public void subscribeToDeadLetters() { ActorSystem system = ActorSystem.create(SpawnProtocol.create(), "DeadLettersSystem"); // #subscribe-deadletter - CompletionStage> spawnActorFuture = AskPattern.ask(system, - r -> new Spawn<>(DeadLetterActor.create(), "DeadLetters", Props.empty(), r), - Duration.ofSeconds(3), system.scheduler()); - ActorRef deadLetters = spawnActorFuture.toCompletableFuture().join(); - system.eventStream().tell(new Subscribe<>(DeadLetter.class, deadLetters)); + ActorSystem.create(Behaviors.setup(ctx -> { + ActorRef listener = ctx.spawn(DeadLetterActor.create(), "listener"); + ctx.getSystem().eventStream().tell(new Subscribe<>(DeadLetter.class, listener)); + return Behaviors.empty(); + }), "DeadLettersSystem"); // #subscribe-deadletter ActorTestKit.shutdown(system); } From 7747bca35a9b2446ab387e33e03f8d6f8458669d Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Sat, 24 Aug 2024 12:08:09 +0800 Subject: [PATCH 5/6] clean --- .../typed/eventstream/LoggingDocTest.java | 54 +++++++++---------- .../typed/eventstream/LoggingDocSpec.scala | 26 +++++---- .../src/main/paradox/typed/event-stream.md | 2 +- 3 files changed, 37 insertions(+), 45 deletions(-) diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java index 844d51761ba..4dca1b0ea10 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/eventstream/LoggingDocTest.java @@ -15,8 +15,13 @@ import akka.actor.typed.SpawnProtocol; import akka.actor.typed.SpawnProtocol.Spawn; import akka.actor.typed.eventstream.EventStream.Publish; +import akka.actor.typed.eventstream.EventStream.Subscribe; +import akka.actor.typed.javadsl.AbstractBehavior; +import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.AskPattern; +import akka.actor.typed.javadsl.Behaviors; +import akka.actor.typed.javadsl.Receive; import java.time.Duration; import java.util.concurrent.CompletionStage; import org.junit.Assert; @@ -26,56 +31,47 @@ import akka.actor.DeadLetter; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; -import akka.actor.typed.eventstream.EventStream.Subscribe; -import akka.actor.typed.javadsl.AbstractBehavior; -import akka.actor.typed.javadsl.ActorContext; -import akka.actor.typed.javadsl.Behaviors; -import akka.actor.typed.javadsl.Receive; -import org.slf4j.Logger; // #imports-deadletter public class LoggingDocTest extends JUnitSuite { @Test public void subscribeToDeadLetters() { - ActorSystem system = ActorSystem.create(SpawnProtocol.create(), - "DeadLettersSystem"); - // #subscribe-deadletter - ActorSystem.create(Behaviors.setup(ctx -> { - ActorRef listener = ctx.spawn(DeadLetterActor.create(), "listener"); - ctx.getSystem().eventStream().tell(new Subscribe<>(DeadLetter.class, listener)); - return Behaviors.empty(); - }), "DeadLettersSystem"); - // #subscribe-deadletter + ActorSystem system = ActorSystem.create( + Behaviors.setup(ctx -> { + Behavior deadLetterListener = Behaviors.empty(); + // #subscribe-deadletter + ActorRef listener = ctx.spawn(deadLetterListener, "listener"); + ctx.getSystem().eventStream().tell(new Subscribe<>(DeadLetter.class, listener)); + // #subscribe-deadletter + return SpawnProtocol.create(); + }), "DeadLettersSystem"); ActorTestKit.shutdown(system); } public // #deadletter-actor - static class DeadLetterActor extends AbstractBehavior { - - final Logger log = getContext().getLog(); + static class DeadLetterActor extends AbstractBehavior { - public static Behavior create() { + public static Behavior create() { return Behaviors.setup(DeadLetterActor::new); } - public DeadLetterActor(ActorContext context) { + public DeadLetterActor(ActorContext context) { super(context); ActorRef messageAdapter = context.messageAdapter( DeadLetter.class, - d -> d + d -> d.message().toString() ); - // subscribe DeadLetter at start up. + // subscribe DeadLetter at startup. context.getSystem().eventStream() .tell(new Subscribe<>(DeadLetter.class, messageAdapter)); } @Override - public Receive createReceive() { - return newReceiveBuilder().onMessage(DeadLetter.class, msg -> { - log.info("receive dead letter: {} from <{}> to <{}>", msg, msg.sender(), - msg.recipient()); + public Receive createReceive() { + return newReceiveBuilder().onMessage(String.class, msg -> { + getContext().getLog().info("receive dead letter: {}", msg); return Behaviors.same(); }).build(); } @@ -107,8 +103,6 @@ public Electronic(String artist) { static class Listener extends AbstractBehavior { - final Logger log = getContext().getLog(); - public static Behavior create() { return Behaviors.setup(Listener::new); } @@ -122,12 +116,12 @@ public Listener(ActorContext context) { public Receive createReceive() { return newReceiveBuilder() .onMessage(Jazz.class, msg -> { - log.info("{} is listening to Jazz: {}", getContext().getSelf().path().name(), + getContext().getLog().info("{} is listening to Jazz: {}", getContext().getSelf().path().name(), msg); return Behaviors.same(); }) .onMessage(Electronic.class, msg -> { - log.info("{} is listening to Electronic: {}", + getContext().getLog().info("{} is listening to Electronic: {}", getContext().getSelf().path().name(), msg); return Behaviors.same(); }).build(); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala index 2c778937801..2df2c21bb89 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala @@ -33,18 +33,14 @@ object LoggingDocSpec { object DeadLetterListener { - def apply(): Behavior[DeadLetter] = Behaviors.setup { context => - // subscribe DeadLetter at start up. - val adapter = context.messageAdapter[DeadLetter](d => d) + def apply(): Behavior[String] = Behaviors.setup { context => + // subscribe DeadLetter at startup. + val adapter = context.messageAdapter[DeadLetter](d => d.message.toString) context.system.eventStream ! Subscribe(adapter) Behaviors.receiveMessage { - case deadLetter: DeadLetter => - context.log.info( - "receive dead letter: {} from <{}> to <{}>", - deadLetter.message, - deadLetter.sender, - deadLetter.recipient) + case msg: String => + context.log.info("receive dead letter: {}", msg) Behaviors.same } } @@ -78,7 +74,7 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with import LoggingDocSpec._ import akka.actor.typed.scaladsl.AskPattern._ - "allow registration to dead letters from start up" in { + "allow registration to dead letters" in { // #deadletters ActorSystem(Behaviors.setup[Void] { context => context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty) @@ -88,13 +84,15 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with } "allow registration to dead letters" in { - // #subscribe-deadletter ActorSystem(Behaviors.setup[Void] { context => - val deadLetterListener = context.spawn(DeadLetterListener(), "DeadLetterListener", Props.empty) - context.system.eventStream ! Subscribe[DeadLetter](deadLetterListener) + val deadLetterListener = Behaviors.empty[DeadLetter] + // #subscribe-deadletter + val listenerRef: ActorRef[DeadLetter] = context.spawn(DeadLetterListener(), "DeadLetterListener") + context.system.eventStream ! Subscribe[DeadLetter](listenerRef) + // #subscribe-deadletter + Behaviors.empty }, "System") - // #subscribe-deadletter } "demonstrate superclass subscriptions on typed eventStream" in { diff --git a/akka-docs/src/main/paradox/typed/event-stream.md b/akka-docs/src/main/paradox/typed/event-stream.md index 501183e2a1a..8fba277752d 100644 --- a/akka-docs/src/main/paradox/typed/event-stream.md +++ b/akka-docs/src/main/paradox/typed/event-stream.md @@ -36,7 +36,7 @@ It uses @ref:[Subchannel Classification](#subchannel-classification) which enabl ## How to use -The following example demonstrates how a subscription works. Given an actor will subscribe DeadLetter from start up: +The following example demonstrates how a subscription works. Given an actor will subscribe DeadLetter from startup: Scala : @@snip [LoggingDocSpec.scala](/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala) { #deadletters } From 1ca337f2d93ad167af7024ecf55afc5bd1969c82 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Sat, 24 Aug 2024 12:18:31 +0800 Subject: [PATCH 6/6] fix compile --- .../scala/akka/actor/typed/eventstream/LoggingDocSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala index 2df2c21bb89..3401285a2df 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/eventstream/LoggingDocSpec.scala @@ -87,7 +87,7 @@ class LoggingDocSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with ActorSystem(Behaviors.setup[Void] { context => val deadLetterListener = Behaviors.empty[DeadLetter] // #subscribe-deadletter - val listenerRef: ActorRef[DeadLetter] = context.spawn(DeadLetterListener(), "DeadLetterListener") + val listenerRef: ActorRef[DeadLetter] = context.spawn(deadLetterListener, "DeadLetterListener") context.system.eventStream ! Subscribe[DeadLetter](listenerRef) // #subscribe-deadletter