diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index 596b269a1cb..0865245c357 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -1014,4 +1014,258 @@ class RestartSpec created.get() shouldEqual 1 } } + + "A restart with backoff source with context" should { + "run normally" taggedAs TimingTest in { + val created = new AtomicInteger + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings) { () => + created.incrementAndGet() + SourceWithContext.fromTuples(Source.unfold(0) { state => + Some((state + 1) -> ("a" -> state)) + }) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("a" -> 1) + probe.requestNext("a" -> 2) + probe.requestNext("a" -> 3) + + created.get shouldBe 1 + + probe.cancel() + } + + "restart on completion" taggedAs TimingTest in { + val created = new AtomicInteger + + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings) { () => + val count = created.getAndIncrement() * 2 + SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1)))) + } + .runWith(TestSink()) + + EventFilter.info(start = "Restarting stream due to completion", occurrences = 2).intercept { + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + probe.requestNext("a" -> 2) + probe.requestNext("b" -> 3) + probe.requestNext("a" -> 4) + } + + created.get() shouldBe 3 + + probe.cancel() + } + + "restart on failure" taggedAs TimingTest in { + val created = new AtomicInteger + + val sourceFactory = { () => + SourceWithContext + .fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) => + (offset + 1) -> (elem -> offset) + }, _ => None)) + .map { (elem: String) => + if (elem == "c") throw TE("failed") + else elem + } + } + + val probe = + RestartSourceWithContext.withBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink()) + + EventFilter.info(start = "Restarting stream due to failure", occurrences = 2).intercept { + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + // offset 2 is "c" which blew up, triggering a restart + probe.requestNext("a" -> 3) + probe.requestNext("b" -> 4) + // offset 5 is "c", dropped in the restarting + probe.requestNext("a" -> 6) + } + + created.get() shouldBe 3 + + probe.cancel() + } + + "backoff before restart" taggedAs TimingTest in { + val created = new AtomicInteger + + val probe = RestartSourceWithContext + .withBackoff(restartSettings) { () => + val count = created.getAndIncrement() * 2 + SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1)))) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + + val deadline = (minBackoff - 1.millis).fromNow + probe.request(1) + + probe.expectNext("a" -> 2) + deadline.isOverdue() shouldBe true + + created.get() shouldBe 2 + + probe.cancel() + } + + "reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" taggedAs TimingTest in { + val created = new AtomicInteger + val probe = RestartSourceWithContext + .withBackoff(restartSettings) { () => + val count = created.getAndIncrement() * 2 + SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1)))) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + + val deadline = (minBackoff - 1.millis).fromNow + probe.request(1) + probe.expectNext("a" -> 2) + deadline.isOverdue() shouldBe true + probe.requestNext("b" -> 3) + + probe.request(1) + // The probe should now back off again with increased backoff + + // Wait for the delay, then subsequent backoff, to pass, so the restart count is reset + Thread.sleep(((minBackoff * 3) + 500.millis).toMillis) + + probe.expectNext("a" -> 4) + probe.requestNext("b" -> 5) + + probe.requestNext(2 * minBackoff - 1.milli) should be("a" -> 6) + + created.get() shouldBe 4 + + probe.cancel() + } + + "cancel the currently running SourceWithContext when canceled" taggedAs TimingTest in { + val created = new AtomicInteger() + val promise = Promise[Done]() + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings) { () => + SourceWithContext.fromTuples(Source.repeat("a").map { _ -> created.getAndIncrement() }.watchTermination() { + (_, term) => + promise.completeWith(term) + }) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.cancel() + + promise.future.futureValue shouldBe Done + + // wait to ensure that it isn't restarted + Thread.sleep(200) + created.get() shouldBe 1 + } + + "not restart the SourceWithContext when cancelled while backing off" taggedAs TimingTest in { + val created = new AtomicInteger() + val probe = RestartSourceWithContext + .withBackoff(restartSettings) { () => + created.getAndIncrement() + SourceWithContext.fromTuples(Source.single("a" -> 1)) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 1) + probe.request(1) + // back-off delays the restart (racy...) + probe.cancel() + + // wait to ensure it isn't restarted + Thread.sleep((minBackoff + 100.millis).toMillis) + created.get() shouldBe 1 + } + + "stop on completion if it should only be restarted on failures" taggedAs TimingTest in { + val created = new AtomicInteger() + + val cgai = { () => + created.getAndIncrement() + } + + val probe = RestartSourceWithContext + .onFailuresWithBackoff(shortRestartSettings) { () => + cgai() + SourceWithContext.fromTuples(Source(Seq("a" -> cgai(), "b" -> cgai(), "c" -> cgai()))).map { + case "c" => if (created.get() <= 4) throw new TE("failed") else "c" + case other => other + } + } + .runWith(TestSink()) + + probe.requestNext("a" -> 1) + probe.requestNext("b" -> 2) + // fails and restarts + probe.requestNext("a" -> 5) + probe.requestNext("b" -> 6) + probe.requestNext("c" -> 7) + probe.expectComplete() + + created.get() shouldBe 8 + + probe.cancel() + } + + "restart on failure when only due to failures should be restarted" taggedAs TimingTest in { + val created = new AtomicInteger() + + val sourceFactory = { () => + SourceWithContext + .fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) => + (offset + 1) -> (elem -> offset) + }, _ => None)) + .map { elem => + if (elem == "c") throw TE("failed") + else elem + } + } + + val probe = + RestartSourceWithContext.onFailuresWithBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("b" -> 1) + // offset 2 is "c" which blew up, triggering a restart + probe.requestNext("a" -> 3) + probe.requestNext("b" -> 4) + // offset 5 is "c", dropped in the restarting + probe.requestNext("a" -> 6) + + created.get() shouldBe 3 + + probe.cancel() + } + + "not restart when maxRestarts is reached" taggedAs TimingTest in { + val created = new AtomicInteger() + val probe = RestartSourceWithContext + .withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () => + SourceWithContext.fromTuples(Source.single("a").map(_ -> created.getAndIncrement())) + } + .runWith(TestSink()) + + probe.requestNext("a" -> 0) + probe.requestNext("a" -> 1) + probe.expectComplete() + + created.get() shouldBe 2 + + probe.cancel() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/RestartSourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSourceWithContext.scala new file mode 100644 index 00000000000..57527ab51c3 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/RestartSourceWithContext.scala @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.stream.javadsl + +import akka.NotUsed +import akka.japi.function.Creator +import akka.stream.RestartSettings +import akka.stream.scaladsl + +/** + * A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will, + * e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the SourceWithContext restarts. + */ +object RestartSourceWithContext { + + /** + * Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails or completes using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining the restart configuration + * @param sourceFactory A factory for producing the SourceWithContext to wrap + */ + def withBackoff[T, C]( + settings: RestartSettings, + sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = { + val underlyingFactory = () => sourceFactory.create().asScala + new SourceWithContext(scaladsl.RestartSourceWithContext.withBackoff(settings)(underlyingFactory)) + } + + /** + * Wrap the given [[SourceWithContext]] with a SourceWithContext that will restart it when it fails using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped SourceWithContext is handled by restarting it. The wrapped SourceWithContext can however be canceled by canceling the returned SourceWithContext. When that happens, the wrapped SourceWithContext if currently running will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining the restart configuration + * @param sourceFactory A factory for producing the SourceWithContext to wrap + */ + def onFailuresWithBackoff[T, C]( + settings: RestartSettings, + sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = { + val underlyingFactory = () => sourceFactory.create().asScala + new SourceWithContext(scaladsl.RestartSourceWithContext.onFailuresWithBackoff(settings)(underlyingFactory)) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala new file mode 100644 index 00000000000..0decac18dc6 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartSourceWithContext.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2023 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.NotUsed +import akka.stream.RestartSettings + +/** + * A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails. + * + * They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will, + * e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the [[SourceWithContext]] restarts. + */ +object RestartSourceWithContext { + + /** + * Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails or completes using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining restart configuration + * @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap + */ + def withBackoff[T, C](settings: RestartSettings)( + sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = { + val underlyingFactory = () => sourceFactory().asSource + SourceWithContext.fromTuples( + Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = false))) + } + + /** + * Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails using an exponential backoff. + * + * The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted. + * + * @param settings [[RestartSettings]] defining restart configuration + * @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap + */ + def onFailuresWithBackoff[T, C](settings: RestartSettings)( + sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = { + val underlyingFactory = () => sourceFactory().asSource + SourceWithContext.fromTuples( + Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = true))) + } +}