@@ -18,15 +18,17 @@ import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler
1818import  pekko .annotation .InternalApi 
1919import  pekko .dispatch .ExecutionContexts 
2020import  pekko .http .scaladsl .Http .HostConnectionPool 
21- import  pekko .http .scaladsl .model .headers .Authorization 
21+ import  pekko .http .scaladsl .model .headers .{  Authorization ,  Connection  } 
2222import  pekko .http .scaladsl .model .{ HttpRequest , HttpResponse  }
23+ import  pekko .http .scaladsl .settings .ConnectionPoolSettings 
2324import  pekko .http .scaladsl .unmarshalling .{ FromResponseUnmarshaller , Unmarshal  }
2425import  pekko .http .scaladsl .{ Http , HttpExt  }
2526import  pekko .stream .connectors .google .{ GoogleAttributes , GoogleSettings , RequestSettings , RetrySettings  }
2627import  pekko .stream .connectors .google .util .Retry 
2728import  pekko .stream .scaladsl .{ Flow , FlowWithContext , Keep , RetryFlow  }
2829
2930import  scala .concurrent .{ ExecutionContextExecutor , Future  }
31+ import  scala .concurrent .duration .Duration 
3032import  scala .util .{ Failure , Success , Try  }
3133
3234@ InternalApi 
@@ -48,14 +50,21 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
4850  private  implicit  def  system :  ExtendedActorSystem  =  http.system
4951  private  implicit  def  ec :  ExecutionContextExecutor  =  system.dispatcher
5052  private  implicit  def  scheduler :  Scheduler  =  system.scheduler
53+   private  def  defaultConnectionPoolSettingsWithInfKeepAlive  = 
54+     ConnectionPoolSettings (system).withKeepAliveTimeout(Duration .Inf )
55+ 
56+   private  def  setKeepAlive (request : HttpRequest ):  HttpRequest  = 
57+     request.removeHeader(" connection"  ).addHeader(Connection (" Keep-Alive"  ))
5158
5259  /**  
5360   * Sends a single [[HttpRequest ]] and returns the raw [[HttpResponse ]]. 
5461   */  
5562  def  singleRawRequest (request : HttpRequest )(implicit  settings : RequestSettings ):  Future [HttpResponse ] =  {
56-     val  requestWithStandardParams  =  addStandardQuery(request)
57-     settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams)) { proxy => 
58-       http.singleRequest(requestWithStandardParams, proxy.connectionContext, proxy.poolSettings)
63+     val  requestWithStandardParams  =  addStandardQuery(setKeepAlive(request))
64+     settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams, http.defaultClientHttpsContext,
65+       defaultConnectionPoolSettingsWithInfKeepAlive)) { proxy => 
66+       http.singleRequest(requestWithStandardParams, proxy.connectionContext,
67+         proxy.poolSettings.withKeepAliveTimeout(Duration .Inf ))
5968    }
6069  }
6170
@@ -121,7 +130,7 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
121130          else 
122131            FlowWithContext [HttpRequest , Ctx ]
123132
124-         val  requestFlow  =  settings.requestSettings.forwardProxy match  {
133+         val  requestFlow  =  ( settings.requestSettings.forwardProxy match  {
125134          case  None  if  ! https => 
126135            http.cachedHostConnectionPool[Ctx ](host, p)
127136          case  Some (proxy) if  ! https => 
@@ -131,6 +140,8 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
131140          case  Some (proxy) if  https => 
132141            http.cachedHostConnectionPoolHttps[Ctx ](host, p, proxy.connectionContext, proxy.poolSettings)
133142          case  _ =>  throw  new  RuntimeException (s " illegal proxy settings with https= $https" )
143+         }).contramap[(HttpRequest , Ctx )] { case  (request, context) => 
144+           (setKeepAlive(request), context)
134145        }
135146
136147        val  unmarshalFlow  =  Flow [(Try [HttpResponse ], Ctx )].mapAsyncUnordered(parallelism) {
0 commit comments