Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 104 additions & 44 deletions chronos/apps/http/httpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{.push raises: [].}

import std/[uri, tables, sequtils]
import stew/[assign2, base10, base64, byteutils, ptrops, shims/sequninit], httputils, results
import stew/[assign2, base10, byteutils, ptrops, shims/sequninit], httputils, results
import ../../[asyncloop, asyncsync, config]
import ../../streams/[asyncstream, tlsstream, chunkstream, boundstream]
import httptable, httpcommon, httpagent, httpbodyrw, multipart
Expand Down Expand Up @@ -108,13 +108,19 @@ type
writer*: AsyncStreamWriter
state*: HttpClientConnectionState
error*: ref HttpError
remoteHostname*: string
remoteHostname*: string # sourced from HttpAddress.id
flags*: set[HttpClientConnectionFlag]
timestamp*: Moment
duration*: Duration

HttpClientConnectionRef* = ref HttpClientConnection

HttpConnectionProvider* = proc(
request: HttpClientRequestRef
): Future[HttpClientConnectionRef] {.
async: (raises: [CancelledError, HttpConnectionError])
.}

HttpSessionRef* = ref object
connections*: Table[string, seq[HttpClientConnectionRef]]
counter*: uint64
Expand All @@ -130,9 +136,10 @@ type
socketFlags*: set[SocketFlags]
flags*: HttpClientFlags
dualstack*: DualStackType
provider: HttpConnectionProvider

HttpAddress* = object
id*: string
id*: string # hostname:port
scheme*: HttpClientScheme
hostname*: string
port*: uint16
Expand Down Expand Up @@ -258,7 +265,7 @@ template isIdle(conn: HttpClientConnectionRef, timestamp: Moment,
(timestamp - conn.timestamp) >= timeout or conn.transp.atEof()

proc sessionWatcher(session: HttpSessionRef) {.async: (raises: []).}

proc directProvider*(): HttpConnectionProvider
proc new*(t: typedesc[HttpSessionRef],
flags: HttpClientFlags = {},
maxRedirections = HttpMaxRedirections,
Expand All @@ -269,7 +276,8 @@ proc new*(t: typedesc[HttpSessionRef],
idleTimeout = HttpConnectionIdleTimeout,
idlePeriod = HttpConnectionCheckPeriod,
socketFlags: set[SocketFlags] = {},
dualstack = DualStackType.Auto): HttpSessionRef =
dualstack = DualStackType.Auto,
provider: HttpConnectionProvider = nil,): HttpSessionRef =
## Create new HTTP session object.
##
## ``maxRedirections`` - maximum number of HTTP 3xx redirections
Expand All @@ -278,7 +286,7 @@ proc new*(t: typedesc[HttpSessionRef],
## ``idleTimeout`` - timeout to consider HTTP connection as idle
## ``idlePeriod`` - period of time to check HTTP connections for inactivity
doAssert(maxRedirections >= 0, "maxRedirections should not be negative")
var res = HttpSessionRef(
let res = HttpSessionRef(
flags: flags,
maxRedirections: maxRedirections,
connectTimeout: connectTimeout,
Expand All @@ -289,7 +297,12 @@ proc new*(t: typedesc[HttpSessionRef],
idlePeriod: idlePeriod,
connections: initTable[string, seq[HttpClientConnectionRef]](),
socketFlags: socketFlags,
dualstack: dualstack
dualstack: dualstack,
provider:
if provider.isNil:
directProvider()
else:
provider
)
res.watcherFut =
if HttpClientFlag.Http11Pipeline in flags:
Expand Down Expand Up @@ -571,6 +584,7 @@ proc new(
flags = session.flags.getTLSFlags(),
bufferSize = session.connectionBufferSize)
except TLSStreamInitError as exc:
# TODO treader and twriter leak here
return err(exc.msg)

res = HttpClientConnectionRef(
Expand Down Expand Up @@ -626,23 +640,21 @@ proc closeWait(conn: HttpClientConnectionRef) {.async: (raises: []).} =
proc connect(session: HttpSessionRef,
ha: HttpAddress): Future[HttpClientConnectionRef] {.
async: (raises: [CancelledError, HttpConnectionError]).} =
## Establish new connection with remote server using ``url`` and ``flags``.
## Establish new connection with remote server using ``ha``.
## On success returns ``HttpClientConnectionRef`` object.
var lastError = ""
# Here we trying to connect to every possible remote host address we got after
# DNS resolution.
for address in ha.addresses:
let transp =
try:
try:
let transp =
await connect(address, bufferSize = session.connectionBufferSize,
flags = session.socketFlags,
dualstack = session.dualstack)
except TransportError:
nil
if not(isNil(transp)):
let conn =
block:
let res = HttpClientConnectionRef.new(session, ha, transp).valueOr:
await transp.closeWait()
raiseHttpConnectionError(
"Could not connect to remote host, reason: " & error)
if res.kind == HttpClientScheme.Secure:
Expand All @@ -665,26 +677,29 @@ proc connect(session: HttpSessionRef,
res
if conn.state == HttpClientConnectionState.Ready:
return conn
except TransportError as exc:
lastError = exc.msg

# If all attempts to connect to the remote host have failed.
if len(lastError) > 0:
raiseHttpConnectionError("Could not connect to remote host, reason: " &
raiseHttpConnectionError("Could not connect to remote host: " &
lastError)
else:
raiseHttpConnectionError("Could not connect to remote host")

proc removeConnection(session: HttpSessionRef,
conn: HttpClientConnectionRef) {.async: (raises: []).} =
conn: HttpClientConnectionRef,
ha: HttpAddress) {.async: (raises: []).} =
let removeHost =
block:
var res = false
session.connections.withValue(conn.remoteHostname, connections):
session.connections.withValue(ha.id, connections):
connections[].keepItIf(it != conn)
if len(connections[]) == 0:
res = true
res
if removeHost:
session.connections.del(conn.remoteHostname)
session.connections.del(ha.id)
dec(session.connectionsCount)
await conn.closeWait()

Expand Down Expand Up @@ -726,7 +741,8 @@ proc acquireConnection(
connection

proc releaseConnection(session: HttpSessionRef,
connection: HttpClientConnectionRef) {.
connection: HttpClientConnectionRef,
ha: HttpAddress) {.
async: (raises: []).} =
## Return connection back to the ``session``.
let removeConnection =
Expand Down Expand Up @@ -758,7 +774,7 @@ proc releaseConnection(session: HttpSessionRef,
true

if removeConnection:
await session.removeConnection(connection)
await session.removeConnection(connection, ha)
else:
connection.state = HttpClientConnectionState.Ready
connection.flags.excl({HttpClientConnectionFlag.Request,
Expand All @@ -775,7 +791,7 @@ proc releaseConnection(request: HttpClientRequestRef) {.async: (raises: []).} =
request.session = nil
connection.flags.excl(HttpClientConnectionFlag.Request)
if HttpClientConnectionFlag.Response notin connection.flags:
await session.releaseConnection(connection)
await session.releaseConnection(connection, request.address)

proc releaseConnection(response: HttpClientResponseRef) {.
async: (raises: []).} =
Expand All @@ -788,7 +804,42 @@ proc releaseConnection(response: HttpClientResponseRef) {.
response.session = nil
connection.flags.excl(HttpClientConnectionFlag.Response)
if HttpClientConnectionFlag.Request notin connection.flags:
await session.releaseConnection(connection)
await session.releaseConnection(connection, response.address)


proc directProvider*(): HttpConnectionProvider =
## Return a connection provider that supplies connections directly to the
## requested address.
return proc(
request: HttpClientRequestRef
): Future[HttpClientConnectionRef] {.
async: (raises: [CancelledError, HttpConnectionError], raw: true)
.} =
request.session.acquireConnection(request.address, request.flags)

proc httpProxyProvider*(uri: Uri): HttpConnectionProvider =
## Return a connection provider that supplies connections via a forwarding
## HTTP proxy.
##
## The connection to the proxy can be established via TLS enabling the use
## of secure proxies.
return proc(
request: HttpClientRequestRef
): Future[HttpClientConnectionRef] {.
async: (raises: [CancelledError, HttpConnectionError])
.} =
# Resolve the proxy on every connection attempt
let ha = request.session.getAddress(uri).valueOr:
raiseHttpConnectionError(error)

if (len(ha.username) > 0 or len(ha.password) > 0) and
ProxyAuthorizationHeader notin request.headers:
request.headers.add(
ProxyAuthorizationHeader,
encodeBasicAuth(ha.username, ha.password),
)

await request.session.acquireConnection(ha, request.flags)

proc closeWait*(session: HttpSessionRef) {.async: (raises: []).} =
## Closes HTTP session object.
Expand Down Expand Up @@ -1128,12 +1179,12 @@ proc prepareRequest(request: HttpClientRequestRef): string =

# We will send `Authorization` information only if username or password set,
# and `Authorization` header is not present in request's headers.
if len(request.address.username) > 0 or len(request.address.password) > 0:
if AuthorizationHeader notin request.headers:
let auth = request.address.username & ":" & request.address.password
let header = "Basic " &
Base64Pad.encode(auth.toOpenArrayByte(0, len(auth) - 1))
request.headers.add(AuthorizationHeader, header)
if (len(request.address.username) > 0 or len(request.address.password) > 0) and
AuthorizationHeader notin request.headers:
request.headers.add(
AuthorizationHeader,
encodeBasicAuth(request.address.username, request.address.password),
)

# Here we perform automatic detection: if request was created with non-zero
# body and `Content-Length` header is missing we will create one with size
Expand All @@ -1152,24 +1203,33 @@ proc prepareRequest(request: HttpClientRequestRef): string =
else:
HttpClientBodyFlag.Custom

let entity =
block:
var res =
if len(request.address.path) > 0:
request.address.path
else:
"/"
if len(request.address.query) > 0:
res.add("?")
res.add(request.address.query)
if len(request.address.anchor) > 0:
res.add("#")
res.add(request.address.anchor)
res

# https://www.rfc-editor.org/info/rfc9112/#section-3
var res = $request.meth
res.add(" ")
res.add(entity)

if request.connection.remoteHostname != request.address.id:
# The connection is a proxy - use absolute-form
let scheme =
case request.address.scheme
of HttpClientScheme.NonSecure: "http"
of HttpClientScheme.Secure: "https"

res.add scheme
res.add "://"
res.add $request.address.id

if len(request.address.path) > 0:
res.add(request.address.path)
else:
res.add("/")

if len(request.address.query) > 0:
res.add("?")
res.add(request.address.query)
if len(request.address.anchor) > 0:
res.add("#")
res.add(request.address.anchor)

res.add(" ")
res.add($request.version)
res.add("\r\n")
Expand All @@ -1188,7 +1248,7 @@ proc send*(request: HttpClientRequestRef): Future[HttpClientResponseRef] {.
"Request's state is " & $request.state)
let connection =
try:
await request.session.acquireConnection(request.address, request.flags)
await request.session.provider(request)
except CancelledError as exc:
request.setError(newHttpInterruptError())
raise exc
Expand Down
7 changes: 6 additions & 1 deletion chronos/apps/http/httpcommon.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{.push raises: [].}

import std/[strutils, uri]
import results, httputils
import results, httputils, stew/base64
import ../../asyncloop, ../../asyncsync
import ../../streams/[asyncstream, boundstream]
export asyncloop, asyncsync, results, httputils, strutils
Expand Down Expand Up @@ -43,6 +43,7 @@ const
ServerHeader* = "server"
LocationHeader* = "location"
AuthorizationHeader* = "authorization"
ProxyAuthorizationHeader* = "proxy-authorization"
ContentDispositionHeader* = "content-disposition"

UrlEncodedContentType* = MediaType.init("application/x-www-form-urlencoded")
Expand Down Expand Up @@ -354,3 +355,7 @@ func stringToBytes*(src: openArray[char]): seq[byte] =
dst
else:
default

func encodeBasicAuth*(username, password: string): string =
let auth = username & ":" & password
"Basic " & Base64Pad.encode(auth.toOpenArrayByte(0, high(auth)))
47 changes: 47 additions & 0 deletions tests/testhttpclient.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,53 @@ suite "HTTP client testing suite":
test "HTTP client server-sent events test":
check waitFor(testServerSentEvents(false)) == true

proc testHttpProxyConnectionProvider(): Future[bool] {.async.} =
const
targetHost = "127.0.0.1"
targetPort = 12345'u16
targetPath = "/test/proxy"
var proxyInvocationCount = 0

proc process(r: RequestFence): Future[HttpResponseRef] {.
async: (raises: [CancelledError]).} =
if r.isOk():
let request = r.get()
try:
check request.uri.scheme == "http"
check request.uri.hostname == targetHost
check request.uri.port == $targetPort
check request.uri.path == targetPath
inc(proxyInvocationCount)
await request.respond(Http200, "proxy-ok")
except HttpWriteError as exc:
defaultResponse(exc)
else:
defaultResponse(r.error())

var proxyServer = createServer(initTAddress("127.0.0.1:0"), process, false)
proxyServer.start()
let proxyAddress = proxyServer.instance.localAddress()
let proxyUri = parseUri("http://" & $proxyAddress)
var session = HttpSessionRef.new(provider = httpProxyProvider(proxyUri))
let targetUrl = "http://" & targetHost & ":" & $targetPort & targetPath
let targetAddress = session.getAddress(parseUri(targetUrl)).valueOr:
raise newException(ValueError, "Invalid target address")
var request = HttpClientRequestRef.new(session, targetAddress)
let response = await send(request)
check response.status == 200
let body = await response.getBodyBytes()
check string.fromBytes(body) == "proxy-ok"
await response.closeWait()
await request.closeWait()
await session.closeWait()
await proxyServer.stop()
await proxyServer.closeWait()
check proxyInvocationCount == 1
return true

test "HTTP proxy connection provider test":
check waitFor(testHttpProxyConnectionProvider()) == true

test "HTTP getHttpAddress() test":
block:
# HTTP client supports only `http` and `https` schemes in URL.
Expand Down