Skip to content

Commit 8a36d3d

Browse files
committed
Fix google cloud keep alive settings
1 parent f03bb5d commit 8a36d3d

File tree

1 file changed

+16
-5
lines changed
  • google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http

1 file changed

+16
-5
lines changed

google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler
1818
import pekko.annotation.InternalApi
1919
import pekko.dispatch.ExecutionContexts
2020
import pekko.http.scaladsl.Http.HostConnectionPool
21-
import pekko.http.scaladsl.model.headers.Authorization
21+
import pekko.http.scaladsl.model.headers.{ Authorization, Connection }
2222
import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
23+
import pekko.http.scaladsl.settings.ConnectionPoolSettings
2324
import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal }
2425
import pekko.http.scaladsl.{ Http, HttpExt }
2526
import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings }
2627
import pekko.stream.connectors.google.util.Retry
2728
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow }
2829

2930
import scala.concurrent.{ ExecutionContextExecutor, Future }
31+
import scala.concurrent.duration.Duration
3032
import scala.util.{ Failure, Success, Try }
3133

3234
@InternalApi
@@ -48,14 +50,21 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
4850
private implicit def system: ExtendedActorSystem = http.system
4951
private implicit def ec: ExecutionContextExecutor = system.dispatcher
5052
private implicit def scheduler: Scheduler = system.scheduler
53+
private def defaultConnectionPoolSettingsWithInfKeepAlive =
54+
ConnectionPoolSettings(system).withKeepAliveTimeout(Duration.Inf)
55+
56+
private def setKeepAlive(request: HttpRequest): HttpRequest =
57+
request.removeHeader("connection").addHeader(Connection("Keep-Alive"))
5158

5259
/**
5360
* Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]].
5461
*/
5562
def singleRawRequest(request: HttpRequest)(implicit settings: RequestSettings): Future[HttpResponse] = {
56-
val requestWithStandardParams = addStandardQuery(request)
57-
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams)) { proxy =>
58-
http.singleRequest(requestWithStandardParams, proxy.connectionContext, proxy.poolSettings)
63+
val requestWithStandardParams = addStandardQuery(setKeepAlive(request))
64+
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams, http.defaultClientHttpsContext,
65+
defaultConnectionPoolSettingsWithInfKeepAlive)) { proxy =>
66+
http.singleRequest(requestWithStandardParams, proxy.connectionContext,
67+
proxy.poolSettings.withKeepAliveTimeout(Duration.Inf))
5968
}
6069
}
6170

@@ -121,7 +130,7 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
121130
else
122131
FlowWithContext[HttpRequest, Ctx]
123132

124-
val requestFlow = settings.requestSettings.forwardProxy match {
133+
val requestFlow = (settings.requestSettings.forwardProxy match {
125134
case None if !https =>
126135
http.cachedHostConnectionPool[Ctx](host, p)
127136
case Some(proxy) if !https =>
@@ -131,6 +140,8 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
131140
case Some(proxy) if https =>
132141
http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings)
133142
case _ => throw new RuntimeException(s"illegal proxy settings with https=$https")
143+
}).contramap[(HttpRequest, Ctx)] { case (request, context) =>
144+
(setKeepAlive(request), context)
134145
}
135146

136147
val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) {

0 commit comments

Comments
 (0)