Skip to content

Commit 0e54409

Browse files
committed
Akka & pekko
1 parent 07b7bbf commit 0e54409

File tree

6 files changed

+93
-28
lines changed

6 files changed

+93
-28
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package sttp.client4.akkahttp
2+
3+
import akka.util.ByteString
4+
import akka.stream.scaladsl.Compression
5+
import sttp.capabilities.akka.AkkaStreams
6+
import akka.stream.scaladsl.Source
7+
import sttp.client4._
8+
import sttp.client4.compression.DeflateDefaultCompressor
9+
import sttp.client4.compression.GZipDefaultCompressor
10+
import sttp.client4.compression.Compressor
11+
import akka.stream.scaladsl.StreamConverters
12+
import akka.stream.scaladsl.FileIO
13+
14+
trait AkkaCompressor[R <: AkkaStreams] extends Compressor[R] {
15+
override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
16+
body match {
17+
case InputStreamBody(b, _) => StreamBody(AkkaStreams)(compressStream(StreamConverters.fromInputStream(() => b)))
18+
case StreamBody(b) => StreamBody(AkkaStreams)(compressStream(b.asInstanceOf[Source[ByteString, Any]]))
19+
case FileBody(f, _) => StreamBody(AkkaStreams)(compressStream(FileIO.fromPath(f.toPath)))
20+
case _ => super.apply(body, encoding)
21+
}
22+
23+
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any]
24+
}
25+
26+
class GZipAkkaCompressor[R <: AkkaStreams] extends GZipDefaultCompressor[R] with AkkaCompressor[R] {
27+
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.gzip)
28+
}
29+
30+
class DeflateAkkaCompressor[R <: AkkaStreams] extends DeflateDefaultCompressor[R] with AkkaCompressor[R] {
31+
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.deflate)
32+
}

akka-http-backend/src/main/scala/sttp/client4/akkahttp/AkkaHttpBackend.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import akka.stream.Materializer
1414
import akka.stream.scaladsl.{Flow, Sink}
1515
import sttp.capabilities.akka.AkkaStreams
1616
import sttp.capabilities.{Effect, WebSockets}
17-
import sttp.client4
1817
import sttp.client4.akkahttp.AkkaHttpBackend.EncodingHandler
1918
import sttp.client4.testing.WebSocketStreamBackendStub
2019
import sttp.client4._
@@ -50,9 +49,11 @@ class AkkaHttpBackend private (
5049
if (r.isWebSocket) sendWebSocket(r) else sendRegular(r)
5150
}
5251

52+
private val compressors = List(new GZipAkkaCompressor, new DeflateAkkaCompressor)
53+
5354
private def sendRegular[T](r: GenericRequest[T, R]): Future[Response[T]] =
5455
Future
55-
.fromTry(ToAkka.request(r).flatMap(BodyToAkka(r, r.body, _)))
56+
.fromTry(ToAkka.request(r).flatMap(BodyToAkka(r, _, compressors)))
5657
.map(customizeRequest)
5758
.flatMap(request =>
5859
http
@@ -136,7 +137,7 @@ class AkkaHttpBackend private (
136137
wsFlow.map(Right(_)).getOrElse(Left(decodeAkkaResponse(hr, r.autoDecompressionEnabled)))
137138
)
138139

139-
body.map(client4.Response(_, code, statusText, headers, Nil, r.onlyMetadata))
140+
body.map(sttp.client4.Response(_, code, statusText, headers, Nil, r.onlyMetadata))
140141
}
141142

142143
// http://doc.akka.io/docs/akka-http/10.0.7/scala/http/common/de-coding.html

akka-http-backend/src/main/scala/sttp/client4/akkahttp/BodyToAkka.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,26 @@ import akka.http.scaladsl.model.{
1212
import akka.stream.scaladsl.{Source, StreamConverters}
1313
import akka.util.ByteString
1414
import sttp.capabilities.akka.AkkaStreams
15-
import sttp.client4.internal.throwNestedMultipartNotAllowed
1615
import sttp.client4._
17-
import sttp.model.{HeaderNames, Part}
16+
import sttp.model.Part
1817

1918
import scala.collection.immutable.Seq
2019
import scala.util.{Failure, Success, Try}
20+
import sttp.client4.compression.Compressor
2121

2222
private[akkahttp] object BodyToAkka {
2323
def apply[R](
2424
r: GenericRequest[_, R],
25-
body: GenericRequestBody[R],
26-
ar: HttpRequest
25+
ar: HttpRequest,
26+
compressors: List[Compressor[R]]
2727
): Try[HttpRequest] = {
2828
def ctWithCharset(ct: ContentType, charset: String) =
2929
HttpCharsets
3030
.getForKey(charset)
3131
.map(hc => ContentType.apply(ct.mediaType, () => hc))
3232
.getOrElse(ct)
3333

34-
def contentLength = r.headers.find(_.is(HeaderNames.ContentLength)).flatMap(h => Try(h.value.toLong).toOption)
34+
val (body, contentLength) = Compressor.compressIfNeeded(r, compressors)
3535

3636
def toBodyPart(mp: Part[BodyPart[_]]): Try[AkkaMultipart.FormData.BodyPart] = {
3737
def streamPartEntity(contentType: ContentType, s: AkkaStreams.BinaryStream) =

pekko-http-backend/src/main/scala/sttp/client4/pekkohttp/BodyToPekko.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,26 @@ import pekko.http.scaladsl.model.{
1313
import pekko.stream.scaladsl.{Source, StreamConverters}
1414
import pekko.util.ByteString
1515
import sttp.capabilities.pekko.PekkoStreams
16-
import sttp.client4.internal.throwNestedMultipartNotAllowed
1716
import sttp.client4._
18-
import sttp.model.{HeaderNames, Part}
17+
import sttp.model.Part
1918

2019
import scala.collection.immutable.Seq
2120
import scala.util.{Failure, Success, Try}
21+
import sttp.client4.compression.Compressor
2222

2323
private[pekkohttp] object BodyToPekko {
2424
def apply[R](
2525
r: GenericRequest[_, R],
26-
body: GenericRequestBody[R],
27-
ar: HttpRequest
26+
ar: HttpRequest,
27+
compressors: List[Compressor[R]]
2828
): Try[HttpRequest] = {
2929
def ctWithCharset(ct: ContentType, charset: String) =
3030
HttpCharsets
3131
.getForKey(charset)
3232
.map(hc => ContentType.apply(ct.mediaType, () => hc))
3333
.getOrElse(ct)
3434

35-
def contentLength = r.headers.find(_.is(HeaderNames.ContentLength)).flatMap(h => Try(h.value.toLong).toOption)
35+
val (body, contentLength) = Compressor.compressIfNeeded(r, compressors)
3636

3737
def toBodyPart(mp: Part[BodyPart[_]]): Try[PekkoMultipart.FormData.BodyPart] = {
3838
def streamPartEntity(contentType: ContentType, s: PekkoStreams.BinaryStream) =
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package sttp.client4.pekkohttp
2+
3+
import org.apache.pekko.util.ByteString
4+
import org.apache.pekko.stream.scaladsl.Compression
5+
import sttp.capabilities.pekko.PekkoStreams
6+
import org.apache.pekko.stream.scaladsl.Source
7+
import sttp.client4._
8+
import sttp.client4.compression.DeflateDefaultCompressor
9+
import sttp.client4.compression.GZipDefaultCompressor
10+
import sttp.client4.compression.Compressor
11+
import org.apache.pekko.stream.scaladsl.StreamConverters
12+
import org.apache.pekko.stream.scaladsl.FileIO
13+
14+
trait PekkoCompressor[R <: PekkoStreams] extends Compressor[R] {
15+
override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
16+
body match {
17+
case InputStreamBody(b, _) => StreamBody(PekkoStreams)(compressStream(StreamConverters.fromInputStream(() => b)))
18+
case StreamBody(b) => StreamBody(PekkoStreams)(compressStream(b.asInstanceOf[Source[ByteString, Any]]))
19+
case FileBody(f, _) => StreamBody(PekkoStreams)(compressStream(FileIO.fromPath(f.toPath)))
20+
case _ => super.apply(body, encoding)
21+
}
22+
23+
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any]
24+
}
25+
26+
class GZipPekkoCompressor[R <: PekkoStreams] extends GZipDefaultCompressor[R] with PekkoCompressor[R] {
27+
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.gzip)
28+
}
29+
30+
class DeflatePekkoCompressor[R <: PekkoStreams] extends DeflateDefaultCompressor[R] with PekkoCompressor[R] {
31+
def compressStream(stream: Source[ByteString, Any]): Source[ByteString, Any] = stream.via(Compression.deflate)
32+
}

pekko-http-backend/src/main/scala/sttp/client4/pekkohttp/PekkoHttpBackend.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
11
package sttp.client4.pekkohttp
22

33
import java.io.UnsupportedEncodingException
4-
import org.apache.pekko
5-
import pekko.{Done, NotUsed}
6-
import pekko.actor.{ActorSystem, CoordinatedShutdown}
7-
import pekko.event.LoggingAdapter
8-
import pekko.http.scaladsl.coding.Coders
9-
import pekko.http.scaladsl.model.headers.{BasicHttpCredentials, HttpEncoding, HttpEncodings}
10-
import pekko.http.scaladsl.model.ws.{InvalidUpgradeResponse, Message, ValidUpgrade, WebSocketRequest}
11-
import pekko.http.scaladsl.model.{StatusCode => _, _}
12-
import pekko.http.scaladsl.settings.ConnectionPoolSettings
13-
import pekko.http.scaladsl.{ClientTransport, Http, HttpsConnectionContext}
14-
import pekko.stream.Materializer
15-
import pekko.stream.scaladsl.{Flow, Sink}
4+
import org.apache.pekko.{Done, NotUsed}
5+
import org.apache.pekko.actor.{ActorSystem, CoordinatedShutdown}
6+
import org.apache.pekko.event.LoggingAdapter
7+
import org.apache.pekko.http.scaladsl.coding.Coders
8+
import org.apache.pekko.http.scaladsl.model.headers.{BasicHttpCredentials, HttpEncoding, HttpEncodings}
9+
import org.apache.pekko.http.scaladsl.model.ws.{InvalidUpgradeResponse, Message, ValidUpgrade, WebSocketRequest}
10+
import org.apache.pekko.http.scaladsl.model.{StatusCode => _, _}
11+
import org.apache.pekko.http.scaladsl.settings.ConnectionPoolSettings
12+
import org.apache.pekko.http.scaladsl.{ClientTransport, Http, HttpsConnectionContext}
13+
import org.apache.pekko.stream.Materializer
14+
import org.apache.pekko.stream.scaladsl.{Flow, Sink}
1615
import sttp.capabilities.pekko.PekkoStreams
1716
import sttp.capabilities.{Effect, WebSockets}
18-
import sttp.client4
1917
import sttp.client4.pekkohttp.PekkoHttpBackend.EncodingHandler
2018
import sttp.client4.testing.WebSocketStreamBackendStub
2119
import sttp.client4._
@@ -51,9 +49,11 @@ class PekkoHttpBackend private (
5149
if (r.isWebSocket) sendWebSocket(r) else sendRegular(r)
5250
}
5351

52+
private val compressors = List(new GZipPekkoCompressor, new DeflatePekkoCompressor)
53+
5454
private def sendRegular[T](r: GenericRequest[T, R]): Future[Response[T]] =
5555
Future
56-
.fromTry(ToPekko.request(r).flatMap(BodyToPekko(r, r.body, _)))
56+
.fromTry(ToPekko.request(r).flatMap(BodyToPekko(r, _, compressors)))
5757
.map(customizeRequest)
5858
.flatMap(request =>
5959
http
@@ -137,7 +137,7 @@ class PekkoHttpBackend private (
137137
wsFlow.map(Right(_)).getOrElse(Left(decodePekkoResponse(hr, r.autoDecompressionEnabled)))
138138
)
139139

140-
body.map(client4.Response(_, code, statusText, headers, Nil, r.onlyMetadata))
140+
body.map(sttp.client4.Response(_, code, statusText, headers, Nil, r.onlyMetadata))
141141
}
142142

143143
// http://doc.akka.io/docs/akka-http/10.0.7/scala/http/common/de-coding.html

0 commit comments

Comments
 (0)