Skip to content

Commit 159717b

Browse files
committed
fix: Avoid turning all stream timeouts to TcpIdleTimeoutException
1 parent 9745f88 commit 159717b

File tree

2 files changed

+14
-18
lines changed

2 files changed

+14
-18
lines changed

akka-stream/src/main/scala/akka/stream/impl/Timers.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,11 @@ import akka.stream.stage._
151151

152152
}
153153

154-
final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] {
154+
final class IdleTimeoutBidi[I, O](
155+
val timeout: FiniteDuration,
156+
createFailure: FiniteDuration => Throwable = timeout =>
157+
new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}."))
158+
extends GraphStage[BidiShape[I, I, O, O]] {
155159
val in1 = Inlet[I]("in1")
156160
val in2 = Inlet[O]("in2")
157161
val out1 = Outlet[I]("out1")
@@ -170,7 +174,7 @@ import akka.stream.stage._
170174

171175
final override def onTimer(key: Any): Unit =
172176
if (nextDeadline - System.nanoTime < 0)
173-
failStage(new StreamIdleTimeoutException(s"No elements passed in the last ${timeout.toCoarsest}."))
177+
failStage(createFailure(timeout))
174178

175179
override def preStart(): Unit =
176180
scheduleWithFixedDelay(GraphStageLogicTimer, timeoutCheckInterval(timeout), timeoutCheckInterval(timeout))

akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,11 @@
55
package akka.stream.impl.io
66

77
import java.net.InetSocketAddress
8-
import java.util.concurrent.TimeoutException
98
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
10-
119
import scala.annotation.nowarn
1210
import scala.collection.immutable
1311
import scala.concurrent.{ Future, Promise }
1412
import scala.concurrent.duration.{ Duration, FiniteDuration }
15-
1613
import akka.{ Done, NotUsed }
1714
import akka.actor.{ ActorRef, Terminated }
1815
import akka.annotation.InternalApi
@@ -22,6 +19,7 @@ import akka.io.Tcp
2219
import akka.io.Tcp._
2320
import akka.stream._
2421
import akka.stream.impl.ReactiveStreamsCompliance
22+
import akka.stream.impl.Timers
2523
import akka.stream.impl.fusing.GraphStages.detacher
2624
import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp => StreamTcp }
2725
import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding }
@@ -593,19 +591,13 @@ private[stream] object ConnectionSourceStage {
593591
case Some(address) => s" on connection to [$address]"
594592
case _ => ""
595593
}
594+
BidiFlow.fromGraph(
595+
new Timers.IdleTimeoutBidi(
596+
idleTimeout,
597+
createFailure = _ =>
598+
new TcpIdleTimeoutException(
599+
s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout",
600+
idleTimeout)))
596601

597-
val toNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
598-
BidiFlow.fromFlows(
599-
Flow[ByteString].mapError {
600-
case _: TimeoutException =>
601-
new TcpIdleTimeoutException(
602-
s"TCP idle-timeout encountered$connectionToString, no bytes passed in the last $idleTimeout",
603-
idleTimeout)
604-
},
605-
Flow[ByteString])
606-
val fromNetTimeout: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] =
607-
toNetTimeout.reversed // now the bottom flow transforms the exception, the top one doesn't (since that one is "fromNet")
608-
609-
fromNetTimeout.atop(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](idleTimeout)).atop(toNetTimeout)
610602
}
611603
}

0 commit comments

Comments
 (0)