Skip to content

Commit c73a986

Browse files
committed
Implemented CurlMultiSocket
This implementation uses curl multi socket drive, and uses FileDescriptorPoller from cats effect. It allows using this library alongside other cats effect libraries.
1 parent 235d969 commit c73a986

File tree

11 files changed

+681
-10
lines changed

11 files changed

+681
-10
lines changed

curl/src/main/scala/org/http4s/curl/http/CurlClient.scala

+3
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@ package org.http4s.curl.http
1919
import cats.effect._
2020
import org.http4s.client.Client
2121
import org.http4s.curl.unsafe.CurlExecutorScheduler
22+
import org.http4s.curl.unsafe.CurlMultiSocket
2223

2324
private[curl] object CurlClient {
2425
def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _))
2526

27+
def multiSocket(ms: CurlMultiSocket): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _))
28+
2629
def get: IO[Client[IO]] = IO.executionContext.flatMap {
2730
case ec: CurlExecutorScheduler => IO.pure(apply(ec))
2831
case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler"))

curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala

+65
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.http4s.Response
2222
import org.http4s.curl.internal.Utils
2323
import org.http4s.curl.internal._
2424
import org.http4s.curl.unsafe.CurlExecutorScheduler
25+
import org.http4s.curl.unsafe.CurlMultiSocket
2526

2627
private[curl] object CurlRequest {
2728
private def setup(
@@ -78,6 +79,57 @@ private[curl] object CurlRequest {
7879
)
7980
)
8081

82+
private def setup(
83+
handle: CurlEasy,
84+
send: RequestSend,
85+
recv: RequestRecv,
86+
req: Request[IO],
87+
): Resource[IO, Unit] =
88+
Utils.newZone.flatMap(implicit zone =>
89+
CurlSList().evalMap(headers =>
90+
IO {
91+
// TODO add in options
92+
// handle.setVerbose(true)
93+
94+
import org.http4s.curl.unsafe.libcurl_const
95+
import scala.scalanative.unsafe._
96+
import org.http4s.Header
97+
import org.http4s.HttpVersion
98+
import org.typelevel.ci._
99+
100+
handle.setCustomRequest(toCString(req.method.renderString))
101+
102+
handle.setUpload(true)
103+
104+
handle.setUrl(toCString(req.uri.renderString))
105+
106+
val httpVersion = req.httpVersion match {
107+
case HttpVersion.`HTTP/1.0` => libcurl_const.CURL_HTTP_VERSION_1_0
108+
case HttpVersion.`HTTP/1.1` => libcurl_const.CURL_HTTP_VERSION_1_1
109+
case HttpVersion.`HTTP/2` => libcurl_const.CURL_HTTP_VERSION_2
110+
case HttpVersion.`HTTP/3` => libcurl_const.CURL_HTTP_VERSION_3
111+
case _ => libcurl_const.CURL_HTTP_VERSION_NONE
112+
}
113+
handle.setHttpVersion(httpVersion)
114+
115+
req.headers // curl adds these headers automatically, so we explicitly disable them
116+
.transform(Header.Raw(ci"Expect", "") :: Header.Raw(ci"Transfer-Encoding", "") :: _)
117+
.foreach(header => headers.append(header.toString))
118+
119+
handle.setHttpHeader(headers.toPtr)
120+
121+
handle.setReadData(Utils.toPtr(send))
122+
handle.setReadFunction(RequestSend.readCallback(_, _, _, _))
123+
124+
handle.setHeaderData(Utils.toPtr(recv))
125+
handle.setHeaderFunction(RequestRecv.headerCallback(_, _, _, _))
126+
127+
handle.setWriteData(Utils.toPtr(recv))
128+
handle.setWriteFunction(RequestRecv.writeCallback(_, _, _, _))
129+
}
130+
)
131+
)
132+
81133
def apply(ec: CurlExecutorScheduler, req: Request[IO]): Resource[IO, Response[IO]] = for {
82134
gc <- GCRoot()
83135
handle <- CurlEasy()
@@ -89,4 +141,17 @@ private[curl] object CurlRequest {
89141
_ <- req.body.through(send.pipe).compile.drain.background
90142
resp <- recv.response()
91143
} yield resp
144+
145+
def applyMultiSocket(ms: CurlMultiSocket, req: Request[IO]): Resource[IO, Response[IO]] = for {
146+
gc <- GCRoot()
147+
handle <- CurlEasy()
148+
flow <- FlowControl(handle)
149+
send <- RequestSend(flow)
150+
recv <- RequestRecv(flow)
151+
_ <- gc.add(send, recv)
152+
_ <- setup(handle, send, recv, req)
153+
_ <- ms.addHandlerTerminating(handle, recv.onTerminated).toResource
154+
_ <- req.body.through(send.pipe).compile.drain.background
155+
resp <- recv.response()
156+
} yield resp
92157
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2022 http4s.org
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.http4s.curl.unsafe
18+
19+
import org.http4s.curl.CurlError
20+
21+
import scala.scalanative.unsafe._
22+
23+
final private[curl] case class CURLMcode(value: CInt) extends AnyVal {
24+
@inline def isOk: Boolean = value == 0
25+
@inline def isError: Boolean = value != 0
26+
@inline def throwOnError: Unit =
27+
if (isError) {
28+
throw CurlError.fromMCode(this)
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2022 http4s.org
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.http4s.curl.unsafe
18+
19+
import org.http4s.curl.CurlError
20+
21+
import scala.scalanative.unsafe._
22+
23+
final private[curl] case class CURLcode(value: CInt) extends AnyVal {
24+
@inline def isOk: Boolean = value == 0
25+
@inline def isError: Boolean = value != 0
26+
@inline def throwOnError: Unit =
27+
if (isError) {
28+
throw CurlError.fromCode(this)
29+
}
30+
}

curl/src/main/scala/org/http4s/curl/unsafe/CurlExecutorScheduler.scala

+2
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ import cats.effect.kernel.Resource
2121
import cats.effect.unsafe.PollingExecutorScheduler
2222
import org.http4s.curl.CurlError
2323

24+
import scala.annotation.nowarn
2425
import scala.collection.mutable
2526
import scala.concurrent.duration.Duration
2627
import scala.scalanative.unsafe._
2728
import scala.scalanative.unsigned._
2829

30+
@nowarn
2931
final private[curl] class CurlExecutorScheduler(multiHandle: Ptr[libcurl.CURLM], pollEvery: Int)
3032
extends PollingExecutorScheduler(pollEvery) {
3133

0 commit comments

Comments
 (0)