-
Notifications
You must be signed in to change notification settings - Fork 193
Open
Labels
Milestone
Description
I was moving my project from pekko 1.0.1 to 1.2.0. One of my test which using source.recoverWithRetries failed.
I believe in change #1775 and #1663 merged to pekko 1.2.0 release in case of Source.failed semantics of the API is actualy changed, i.e. max attempts of -1 is no longer working properly.
Please consider following test:
import java.util.concurrent.atomic.AtomicInteger
import org.apache.pekko.stream.scaladsl.Sink
import org.apache.pekko.stream.scaladsl.Source
import org.apache.pekko.util.ByteString
import org.apache.pekko.NotUsed
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class ResilientSourceTest extends UnitTest {
def withRetriesTest(originSource: Source[ByteString, Any])(fallbackTo: Long => Source[ByteString, NotUsed])(shouldRetry: Throwable => Boolean = { _ => true }): Source[ByteString, NotUsed] =
originSource.recoverWithRetries(
-1,
{
case e: Throwable if shouldRetry(e) =>
fallbackTo(0)
}
).mapMaterializedValue(_ => NotUsed)
test("withRetries should stop to retry if shouldRetry decides otherwise") {
val counter = new AtomicInteger()
val source =
withRetriesTest(failedSource("origin")) { _ =>
counter.incrementAndGet()
// failedSource1("works")
failedSource("does not work")
} { _ =>
counter.get() < 3
}
assertThrows[ArithmeticException] {
Await.result(source.runWith(Sink.ignore), Duration.Inf)
}
assert(counter.get() == 3)
}
private def failedSource(message: String): Source[ByteString, NotUsed] =
Source.failed(new ArithmeticException(message))
private def failedSource1(message: String): Source[ByteString, NotUsed] =
Source.single(5).map(_ / 0).map(s => ByteString.fromString(s.toString))
}
When i have Source.failed (case failedSource) test is not passing, while i would expect it should, similar if source would produce error during execution (case failedSource1)
Regards,
Kyrylo
Reactions are currently unavailable