Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1014,4 +1014,258 @@ class RestartSpec
created.get() shouldEqual 1
}
}

"A restart with backoff source with context" should {
"run normally" taggedAs TimingTest in {
val created = new AtomicInteger
val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings) { () =>
created.incrementAndGet()
SourceWithContext.fromTuples(Source.unfold(0) { state =>
Some((state + 1) -> ("a" -> state))
})
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("a" -> 1)
probe.requestNext("a" -> 2)
probe.requestNext("a" -> 3)

created.get shouldBe 1

probe.cancel()
}

"restart on completion" taggedAs TimingTest in {
val created = new AtomicInteger

val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings) { () =>
val count = created.getAndIncrement() * 2
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
}
.runWith(TestSink())

EventFilter.info(start = "Restarting stream due to completion", occurrences = 2).intercept {
probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)
probe.requestNext("a" -> 2)
probe.requestNext("b" -> 3)
probe.requestNext("a" -> 4)
}

created.get() shouldBe 3

probe.cancel()
}

"restart on failure" taggedAs TimingTest in {
val created = new AtomicInteger

val sourceFactory = { () =>
SourceWithContext
.fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) =>
(offset + 1) -> (elem -> offset)
}, _ => None))
.map { (elem: String) =>
if (elem == "c") throw TE("failed")
else elem
}
}

val probe =
RestartSourceWithContext.withBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink())

EventFilter.info(start = "Restarting stream due to failure", occurrences = 2).intercept {
probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)
// offset 2 is "c" which blew up, triggering a restart
probe.requestNext("a" -> 3)
probe.requestNext("b" -> 4)
// offset 5 is "c", dropped in the restarting
probe.requestNext("a" -> 6)
}

created.get() shouldBe 3

probe.cancel()
}

"backoff before restart" taggedAs TimingTest in {
val created = new AtomicInteger

val probe = RestartSourceWithContext
.withBackoff(restartSettings) { () =>
val count = created.getAndIncrement() * 2
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)

val deadline = (minBackoff - 1.millis).fromNow
probe.request(1)

probe.expectNext("a" -> 2)
deadline.isOverdue() shouldBe true

created.get() shouldBe 2

probe.cancel()
}

"reset exponential backoff back to minimum when source runs for at least minimum backoff without completing" taggedAs TimingTest in {
val created = new AtomicInteger
val probe = RestartSourceWithContext
.withBackoff(restartSettings) { () =>
val count = created.getAndIncrement() * 2
SourceWithContext.fromTuples(Source(Seq("a" -> count, "b" -> (count + 1))))
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)

val deadline = (minBackoff - 1.millis).fromNow
probe.request(1)
probe.expectNext("a" -> 2)
deadline.isOverdue() shouldBe true
probe.requestNext("b" -> 3)

probe.request(1)
// The probe should now back off again with increased backoff

// Wait for the delay, then subsequent backoff, to pass, so the restart count is reset
Thread.sleep(((minBackoff * 3) + 500.millis).toMillis)

probe.expectNext("a" -> 4)
probe.requestNext("b" -> 5)

probe.requestNext(2 * minBackoff - 1.milli) should be("a" -> 6)

created.get() shouldBe 4

probe.cancel()
}

"cancel the currently running SourceWithContext when canceled" taggedAs TimingTest in {
val created = new AtomicInteger()
val promise = Promise[Done]()
val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings) { () =>
SourceWithContext.fromTuples(Source.repeat("a").map { _ -> created.getAndIncrement() }.watchTermination() {
(_, term) =>
promise.completeWith(term)
})
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.cancel()

promise.future.futureValue shouldBe Done

// wait to ensure that it isn't restarted
Thread.sleep(200)
created.get() shouldBe 1
}

"not restart the SourceWithContext when cancelled while backing off" taggedAs TimingTest in {
val created = new AtomicInteger()
val probe = RestartSourceWithContext
.withBackoff(restartSettings) { () =>
created.getAndIncrement()
SourceWithContext.fromTuples(Source.single("a" -> 1))
}
.runWith(TestSink())

probe.requestNext("a" -> 1)
probe.request(1)
// back-off delays the restart (racy...)
probe.cancel()

// wait to ensure it isn't restarted
Thread.sleep((minBackoff + 100.millis).toMillis)
created.get() shouldBe 1
}

"stop on completion if it should only be restarted on failures" taggedAs TimingTest in {
val created = new AtomicInteger()

val cgai = { () =>
created.getAndIncrement()
}

val probe = RestartSourceWithContext
.onFailuresWithBackoff(shortRestartSettings) { () =>
cgai()
SourceWithContext.fromTuples(Source(Seq("a" -> cgai(), "b" -> cgai(), "c" -> cgai()))).map {
case "c" => if (created.get() <= 4) throw new TE("failed") else "c"
case other => other
}
}
.runWith(TestSink())

probe.requestNext("a" -> 1)
probe.requestNext("b" -> 2)
// fails and restarts
probe.requestNext("a" -> 5)
probe.requestNext("b" -> 6)
probe.requestNext("c" -> 7)
probe.expectComplete()

created.get() shouldBe 8

probe.cancel()
}

"restart on failure when only due to failures should be restarted" taggedAs TimingTest in {
val created = new AtomicInteger()

val sourceFactory = { () =>
SourceWithContext
.fromTuples(Source(Seq("a", "b", "c")).statefulMap(() => created.getAndIncrement() * 3)({ (offset, elem) =>
(offset + 1) -> (elem -> offset)
}, _ => None))
.map { elem =>
if (elem == "c") throw TE("failed")
else elem
}
}

val probe =
RestartSourceWithContext.onFailuresWithBackoff(shortRestartSettings)(sourceFactory).runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("b" -> 1)
// offset 2 is "c" which blew up, triggering a restart
probe.requestNext("a" -> 3)
probe.requestNext("b" -> 4)
// offset 5 is "c", dropped in the restarting
probe.requestNext("a" -> 6)

created.get() shouldBe 3

probe.cancel()
}

"not restart when maxRestarts is reached" taggedAs TimingTest in {
val created = new AtomicInteger()
val probe = RestartSourceWithContext
.withBackoff(shortRestartSettings.withMaxRestarts(1, shortMinBackoff)) { () =>
SourceWithContext.fromTuples(Source.single("a").map(_ -> created.getAndIncrement()))
}
.runWith(TestSink())

probe.requestNext("a" -> 0)
probe.requestNext("a" -> 1)
probe.expectComplete()

created.get() shouldBe 2

probe.cancel()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.stream.scaladsl

import akka.NotUsed
import akka.stream.RestartSettings

/**
* A RestartSourceWithContext wraps a [[SourceWithContext]] that gets restarted when it completes or fails.
*
* They are useful for graphs that need to run for longer than the [[SourceWithContext]] can necessarily guarantee it will,
* e.g. for [[SourceWithContext]] streams that depend on a remote service to which connectivity may be lost (crash or partition). The RestartSourceWithContext ensures that the graph can continue running while the [[SourceWithContext]] restarts.
*/
object RestartSourceWithContext {

/**
* Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails or completes using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a complete or failure as long as maxRestarts is not reached, since the completion or failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap
*/
def withBackoff[T, C](settings: RestartSettings)(
sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = {
// we like wrapping so much, we unwrap so that we can rewrap... since the intended usecase is for sources (like
// Kafka or whatever) which are a network hop away, this overhead is negligible
val underlyingFactory = () => sourceFactory().asSource
SourceWithContext.fromTuples(
Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = false)))
}

/**
* Wrap the given [[SourceWithContext]] with a [[SourceWithContext]] that will restart it when it fails using an exponential backoff.
*
* The returned [[SourceWithContext]] will not emit a failure as long as maxRestarts is not reached, since the failure of the wrapped [[SourceWithContext]] is handled by restarting it. The wrapped [[SourceWithContext]] can however be canceled by canceling the returned [[SourceWithContext]]. When that happens, the wrapped [[SourceWithContext]] if currently running will be canceled and will not be restarted.
*
* @param settings [[RestartSettings]] defining restart configuration
* @param sourceFactory A factory for producing the [[SourceWithContext]] to wrap
*/
def onFailuresWithBackoff[T, C](settings: RestartSettings)(
sourceFactory: () => SourceWithContext[T, C, _]): SourceWithContext[T, C, NotUsed] = {
val underlyingFactory = () => sourceFactory().asSource
SourceWithContext.fromTuples(
Source.fromGraph(new RestartWithBackoffSource(underlyingFactory, settings, onlyOnFailures = true)))
}
}