Skip to content

Commit 2da9fb6

Browse files
committed
Fix google cloud keep alive settings
1 parent 0cc560d commit 2da9fb6

File tree

2 files changed

+20
-7
lines changed
  • google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http
  • google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl

2 files changed

+20
-7
lines changed

google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,17 @@ import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler
1818
import pekko.annotation.InternalApi
1919
import pekko.dispatch.ExecutionContexts
2020
import pekko.http.scaladsl.Http.HostConnectionPool
21-
import pekko.http.scaladsl.model.headers.Authorization
21+
import pekko.http.scaladsl.model.headers.{ Authorization, Connection }
2222
import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
23+
import pekko.http.scaladsl.settings.ConnectionPoolSettings
2324
import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal }
2425
import pekko.http.scaladsl.{ Http, HttpExt }
2526
import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings }
2627
import pekko.stream.connectors.google.util.Retry
2728
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow }
2829

2930
import scala.concurrent.{ ExecutionContextExecutor, Future }
31+
import scala.concurrent.duration.Duration
3032
import scala.util.{ Failure, Success, Try }
3133

3234
@InternalApi
@@ -48,15 +50,24 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
4850
private implicit def system: ExtendedActorSystem = http.system
4951
private implicit def ec: ExecutionContextExecutor = system.dispatcher
5052
private implicit def scheduler: Scheduler = system.scheduler
53+
private def defaultConnectionPoolSettingsWithInfKeepAlive =
54+
ConnectionPoolSettings(system).withKeepAliveTimeout(Duration.Inf)
55+
56+
private def setKeepAlive(request: HttpRequest): HttpRequest =
57+
if (request.headers.exists { header => header.is("connection") || header.is("keep-alive") }) {
58+
request.removeHeader("connection").removeHeader("keep-alive").addHeader(Connection("Keep-Alive"))
59+
} else request.addHeader(Connection("Keep-Alive"))
5160

5261
/**
5362
* Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]].
5463
*/
5564
def singleRawRequest(request: HttpRequest)(implicit settings: RequestSettings, googleSettings: GoogleSettings)
5665
: Future[HttpResponse] = {
57-
val requestWithStandardParams = addStandardQuery(request)
58-
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams)) { proxy =>
59-
http.singleRequest(requestWithStandardParams, proxy.connectionContext, proxy.poolSettings)
66+
val requestWithStandardParams = addStandardQuery(setKeepAlive(request))
67+
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams, http.defaultClientHttpsContext,
68+
defaultConnectionPoolSettingsWithInfKeepAlive)) { proxy =>
69+
http.singleRequest(requestWithStandardParams, proxy.connectionContext,
70+
proxy.poolSettings.withKeepAliveTimeout(Duration.Inf))
6071
}
6172
}
6273

@@ -123,7 +134,7 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
123134
else
124135
FlowWithContext[HttpRequest, Ctx]
125136

126-
val requestFlow = settings.requestSettings.forwardProxy match {
137+
val requestFlow = (settings.requestSettings.forwardProxy match {
127138
case None if !https =>
128139
http.cachedHostConnectionPool[Ctx](host, p)
129140
case Some(proxy) if !https =>
@@ -133,6 +144,8 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
133144
case Some(proxy) if https =>
134145
http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings)
135146
case _ => throw new RuntimeException(s"illegal proxy settings with https=$https")
147+
}).contramap[(HttpRequest, Ctx)] { case (request, context) =>
148+
(setKeepAlive(request), context)
136149
}
137150

138151
val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) {

google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ class FcmSenderSpec
8383
val request: HttpRequest = captor.getValue
8484
Unmarshal(request.entity).to[FcmSend].futureValue shouldBe FcmSend(false, FcmNotification.empty)
8585
request.uri.toString should startWith("https://fcm.googleapis.com/v1/projects/projectId/messages:send")
86-
request.headers.size shouldBe 1
87-
request.headers.head should matchPattern { case HttpHeader("authorization", "Bearer <no-token>") => }
86+
request.headers.size shouldBe 2
87+
request.headers(1) should matchPattern { case HttpHeader("authorization", "Bearer <no-token>") => }
8888
}
8989

9090
"parse the success response correctly" in {

0 commit comments

Comments
 (0)