Skip to content

Commit 563f9ff

Browse files
authored
chore: Keep actor ref with backpressure around until last ack arrives (#32863)
1 parent fda5d10 commit 563f9ff

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ package akka.stream.scaladsl
66

77
import scala.concurrent.Promise
88
import scala.concurrent.duration._
9-
109
import akka.actor.{ Actor, ActorRef, Props }
1110
import akka.stream.Attributes.inputBuffer
1211
import akka.stream.Materializer
1312
import akka.stream.testkit._
1413
import akka.stream.testkit.scaladsl._
14+
import akka.testkit.EventFilter
1515
import akka.testkit.TestProbe
1616

1717
object ActorRefBackpressureSinkSpec {
@@ -49,7 +49,7 @@ object ActorRefBackpressureSinkSpec {
4949

5050
}
5151

52-
class ActorRefBackpressureSinkSpec extends StreamSpec {
52+
class ActorRefBackpressureSinkSpec extends StreamSpec("akka.loglevel=INFO") {
5353
import ActorRefBackpressureSinkSpec._
5454

5555
def createActor[T](c: Class[T]) =
@@ -247,6 +247,41 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
247247
probe.reply(ackMessage)
248248
probe.expectMsg(completeMessage)
249249
}
250+
251+
"stay around until final ack is sent" in {
252+
val probe = TestProbe()
253+
254+
EventFilter.info(pattern = ".*was not delivered.*", occurrences = 0).intercept {
255+
val sourceProbe = TestSource[String]()
256+
.toMat(
257+
Sink.actorRefWithBackpressure(
258+
probe.ref,
259+
initMessage,
260+
ackMessage,
261+
completeMessage,
262+
(_: Throwable) => failMessage))(Keep.left)
263+
.run()
264+
sourceProbe.ensureSubscription()
265+
266+
probe.expectMsg(initMessage)
267+
probe.reply(ackMessage)
268+
269+
sourceProbe.sendNext("one")
270+
probe.expectMsg("one")
271+
probe.reply(ackMessage)
272+
273+
sourceProbe.sendNext("two")
274+
probe.expectMsg("two")
275+
// buffer empty when complete is seen
276+
sourceProbe.sendComplete()
277+
Thread.sleep(100)
278+
probe.reply(ackMessage)
279+
probe.expectMsg(completeMessage)
280+
281+
// logging takes a while to arrive
282+
Thread.sleep(100)
283+
}
284+
}
250285
}
251286

252287
}

akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ import akka.stream.stage._
8484
}
8585

8686
override def onUpstreamFinish(): Unit = {
87-
if (buffer.isEmpty) finish()
87+
if (buffer.isEmpty && acknowledgementReceived) finish()
8888
else completeReceived = true
8989
}
9090

0 commit comments

Comments
 (0)