Skip to content

Commit 19fa066

Browse files
committed
Fix google cloud keep alive settings
1 parent f03bb5d commit 19fa066

File tree

2 files changed

+20
-7
lines changed
  • google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http
  • google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl

2 files changed

+20
-7
lines changed

google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler
1818
import pekko.annotation.InternalApi
1919
import pekko.dispatch.ExecutionContexts
2020
import pekko.http.scaladsl.Http.HostConnectionPool
21-
import pekko.http.scaladsl.model.headers.Authorization
21+
import pekko.http.scaladsl.model.headers.{ Authorization, Connection }
2222
import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
23+
import pekko.http.scaladsl.settings.ConnectionPoolSettings
2324
import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal }
2425
import pekko.http.scaladsl.{ Http, HttpExt }
2526
import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings }
2627
import pekko.stream.connectors.google.util.Retry
2728
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow }
2829

2930
import scala.concurrent.{ ExecutionContextExecutor, Future }
31+
import scala.concurrent.duration.Duration
3032
import scala.util.{ Failure, Success, Try }
3133

3234
@InternalApi
@@ -48,14 +50,23 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
4850
private implicit def system: ExtendedActorSystem = http.system
4951
private implicit def ec: ExecutionContextExecutor = system.dispatcher
5052
private implicit def scheduler: Scheduler = system.scheduler
53+
private def defaultConnectionPoolSettingsWithInfKeepAlive =
54+
ConnectionPoolSettings(system).withKeepAliveTimeout(Duration.Inf)
55+
56+
private def setKeepAlive(request: HttpRequest): HttpRequest =
57+
if (request.headers.exists { header => header.is("connection") || header.is("keep-alive") }) {
58+
request.removeHeader("connection").removeHeader("keep-alive").addHeader(Connection("Keep-Alive"))
59+
} else request.addHeader(Connection("Keep-Alive"))
5160

5261
/**
5362
* Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]].
5463
*/
5564
def singleRawRequest(request: HttpRequest)(implicit settings: RequestSettings): Future[HttpResponse] = {
56-
val requestWithStandardParams = addStandardQuery(request)
57-
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams)) { proxy =>
58-
http.singleRequest(requestWithStandardParams, proxy.connectionContext, proxy.poolSettings)
65+
val requestWithStandardParams = addStandardQuery(setKeepAlive(request))
66+
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams, http.defaultClientHttpsContext,
67+
defaultConnectionPoolSettingsWithInfKeepAlive)) { proxy =>
68+
http.singleRequest(requestWithStandardParams, proxy.connectionContext,
69+
proxy.poolSettings.withKeepAliveTimeout(Duration.Inf))
5970
}
6071
}
6172

@@ -121,7 +132,7 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
121132
else
122133
FlowWithContext[HttpRequest, Ctx]
123134

124-
val requestFlow = settings.requestSettings.forwardProxy match {
135+
val requestFlow = (settings.requestSettings.forwardProxy match {
125136
case None if !https =>
126137
http.cachedHostConnectionPool[Ctx](host, p)
127138
case Some(proxy) if !https =>
@@ -131,6 +142,8 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
131142
case Some(proxy) if https =>
132143
http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings)
133144
case _ => throw new RuntimeException(s"illegal proxy settings with https=$https")
145+
}).contramap[(HttpRequest, Ctx)] { case (request, context) =>
146+
(setKeepAlive(request), context)
134147
}
135148

136149
val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) {

google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ class FcmSenderSpec
8383
val request: HttpRequest = captor.getValue
8484
Unmarshal(request.entity).to[FcmSend].futureValue shouldBe FcmSend(false, FcmNotification.empty)
8585
request.uri.toString should startWith("https://fcm.googleapis.com/v1/projects/projectId/messages:send")
86-
request.headers.size shouldBe 1
87-
request.headers.head should matchPattern { case HttpHeader("authorization", "Bearer <no-token>") => }
86+
request.headers.size shouldBe 2
87+
request.headers(1) should matchPattern { case HttpHeader("authorization", "Bearer <no-token>") => }
8888
}
8989

9090
"parse the success response correctly" in {

0 commit comments

Comments
 (0)