Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/invidious.cr
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
reinitialize_proxy: false
) do
companion = CONFIG.invidious_companion.sample
next make_client(companion.private_url, use_http_proxy: false)
next make_client(companion.private_url, use_http_proxy: false, allow_auto_reconnect: false)
end

# CLI
Expand Down Expand Up @@ -269,7 +269,7 @@
Kemal.config.env = "production" if !ENV.has_key?("KEMAL_ENV")
{% end %}

Kemal.run do |config|

Check failure on line 272 in src/invidious.cr

View workflow job for this annotation

GitHub Actions / build - crystal: nightly, stable: false

instantiating 'Kemal.run()'
if socket_binding = CONFIG.socket_binding
File.delete?(socket_binding.path)
# Create a socket and set its desired permissions
Expand Down
102 changes: 95 additions & 7 deletions src/invidious/connection/client.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,78 @@
# SSL Contexts are designed to be reused and supports infinite connections.
#
# This *significantly* improves the performance of spinning up new clients/reconnections
#
# For more information see https://github.com/crystal-lang/crystal/issues/15419
private GLOBAL_SSL_CONTEXT = OpenSSL::SSL::Context::Client.new

module Invidious
class IVTCPSocket < TCPSocket
def initialize(host : String, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
connect(addrinfo, timeout: connect_timeout) do |error|
close
error
end
end
end
end

class HTTPClient < HTTP::Client
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does subclasses HTTP::Client but its more for the convenience of using the two protected class methods rather than any actual necessity

def initialize(uri : URI, tls : TLSContext = nil, allow_auto_reconnect : Bool = true)
tls = HTTP::Client.tls_flag(uri, tls)
host = HTTP::Client.validate_host(uri)

super(host, uri.port, tls)

@reconnect = allow_auto_reconnect
end

def initialize(uri : URI, tls : TLSContext = nil, force_resolve : Socket::Family = Socket::Family::UNSPEC)
tls = HTTP::Client.tls_flag(uri, tls)

{% if flag?(:without_openssl) %}
if tls
raise "HTTP::Client TLS is disabled because `-D without_openssl` was passed at compile time"
end
@tls = nil
{% else %}
@tls = case tls
when true
OpenSSL::SSL::Context::Client.new
when OpenSSL::SSL::Context::Client
tls
when false, nil
nil
end
{% end %}

@host = HTTP::Client.validate_host(uri)
@port = (uri.port || (@tls ? 443 : 80)).to_i

tcp_socket = IVTCPSocket.new(
host: @host,
port: @port,
family: force_resolve,
)

if tls = @tls
begin
@io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
rescue ex
# Don't leak the TCP socket when the SSL connection failed
tcp_socket.close
raise ex
end
else
@io = tcp_socket
end

@reconnect = false
end
end
end

def add_yt_headers(request)
request.headers.delete("User-Agent") if request.headers["User-Agent"] == "Crystal"
request.headers["User-Agent"] ||= "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
Expand All @@ -13,14 +88,27 @@ def add_yt_headers(request)
end
end

def make_client(url : URI, region = nil, force_resolve : Bool = false, force_youtube_headers : Bool = false, use_http_proxy : Bool = true)
client = HTTP::Client.new(url)
client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy
def make_client(
url : URI,
region = nil,
force_resolve : Bool = false,
force_youtube_headers : Bool = true,
use_http_proxy : Bool = true,
allow_auto_reconnect : Bool = true,
)
tls = if url.scheme == "https"
GLOBAL_SSL_CONTEXT
else
nil
end

# Force the usage of a specific configured IP Family
if force_resolve
client.family = CONFIG.force_resolve
client.family = Socket::Family::INET if client.family == Socket::Family::UNSPEC
if CONFIG.http_proxy && use_http_proxy
client = Invidious::HTTPClient.new(url, tls: tls)
client.proxy = make_configured_http_proxy_client() if CONFIG.http_proxy && use_http_proxy
elsif force_resolve
client = Invidious::HTTPClient.new(url, tls: tls, force_resolve: CONFIG.force_resolve)
else
client = Invidious::HTTPClient.new(url, tls: tls, allow_auto_reconnect: allow_auto_reconnect)
end

client.before_request { |r| add_yt_headers(r) } if url.host.try &.ends_with?("youtube.com") || force_youtube_headers
Expand Down
106 changes: 73 additions & 33 deletions src/invidious/connection/pool.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ module Invidious::ConnectionPool
# Streaming API for {{method.id.upcase}} request.
# The response will have its body as an `IO` accessed via `HTTP::Client::Response#body_io`.
def {{method.id}}(*args, **kwargs, &)
self.checkout do | client |
self.checkout_with_retry do | client |
client.{{method.id}}(*args, **kwargs) do | response |
result = yield response
return result
return yield response
ensure
response.body_io?.try &.skip_to_end
end
Expand All @@ -38,45 +37,82 @@ module Invidious::ConnectionPool
# Executes a {{method.id.upcase}} request.
# The response will have its body as a `String`, accessed via `HTTP::Client::Response#body`.
def {{method.id}}(*args, **kwargs)
self.checkout do | client |
self.checkout_with_retry do | client |
return client.{{method.id}}(*args, **kwargs)
end
end
{% end %}

# Checks out a client in the pool
#
# This method will NOT delete a client that has errored from the pool.
# Use `#checkout_with_retry` to ensure that the pool does not get poisoned.
def checkout(&)
# If a client has been deleted from the pool
# we won't try to release it
client_exists_in_pool = true

http_client = pool.checkout

# When the HTTP::Client connection is closed, the automatic reconnection
# feature will create a new IO to connect to the server with
#
# This new TCP IO will be a direct connection to the server and will not go
# through the proxy. As such we'll need to reinitialize the proxy connection

http_client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy

response = yield http_client
rescue ex : DB::PoolTimeout
# Failed to checkout a client
raise ConnectionPool::PoolCheckoutError.new(ex.message)
rescue ex
# An error occurred with the client itself.
# Delete the client from the pool and close the connection
if http_client
client_exists_in_pool = false
@pool.delete(http_client)
http_client.close
pool.checkout do |client|
# When the HTTP::Client connection is closed, the automatic reconnection
# feature will create a new IO to connect to the server with
#
# This new TCP IO will be a direct connection to the server and will not go
# through the proxy. As such we'll need to reinitialize the proxy connection
client.proxy = make_configured_http_proxy_client() if @reinitialize_proxy && CONFIG.http_proxy

response = yield client

return response
rescue ex : DB::PoolTimeout
# Failed to checkout a client
raise ConnectionPool::PoolCheckoutError.new(ex.message)
end
end

# Raise exception for outer methods to handle
raise ConnectionPool::Error.new(ex.message, cause: ex)
ensure
pool.release(http_client) if http_client && client_exists_in_pool
# Checks out a client from the pool; retries only if a connection is lost or refused
#
# Will cycle through all of the existing connections at no delay, but any new connections
# that is created will be subject to a delay.
#
# The first attempt to make a new connection will not have the delay, but all subsequent
# attempts will.
#
# To `DB::Pool#retry`:
# - `DB::PoolResourceLost` means that the connection has been lost
# and should be deleted from the pool.
#
# - `DB::PoolResourceRefused` means a new connection was intended to be created but failed
# but the client can be safely released back into the pool to try again later with
#
# See the following code of `crystal-db` for more information
#
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool.cr#L191
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_statement.cr#L41
# https://github.com/crystal-lang/crystal-db/blob/023dc5de90c11927656fc747601c5f08ea3c906f/src/db/pool_prepared_statement.cr#L13
#
def checkout_with_retry(&)
@pool.retry do
self.checkout do |client|
begin
return yield client
rescue ex : IO::TimeoutError
LOGGER.trace("Client: #{client} has failed to complete the request. Retrying with a new client")
raise DB::PoolResourceRefused.new
rescue ex : InfoException
raise ex
rescue ex : Exception
# Any other errors should cause the client to be deleted from the pool

# This means that the client is closed and needs to be deleted from the pool
# due its inability to reconnect
if ex.message == "This HTTP::Client cannot be reconnected"
LOGGER.trace("Checked out client is closed and cannot be reconnected. Trying the next retry attempt...")
else
LOGGER.error("Client: #{client} has encountered an error: #{ex} #{ex.message} and will be removed from the pool")
end

raise DB::PoolResourceLost(HTTP::Client).new(client)
end
end
rescue ex : DB::PoolRetryAttemptsExceeded
raise PoolRetryAttemptsExceeded.new
end
end
end

Expand All @@ -87,6 +123,10 @@ module Invidious::ConnectionPool
class PoolCheckoutError < Error
end

# Raised when too many retries
class PoolRetryAttemptsExceeded < Error
end

# Mapping of subdomain => Invidious::ConnectionPool::Pool
# This is needed as we may need to access arbitrary subdomains of ytimg
private YTIMG_POOLS = {} of String => ConnectionPool::Pool
Expand Down
50 changes: 0 additions & 50 deletions src/invidious/helpers/crystal_class_overrides.cr
Original file line number Diff line number Diff line change
@@ -1,53 +1,3 @@
# Override of the TCPSocket and HTTP::Client classes in order to allow an
# IP family to be selected for domains that resolve to both IPv4 and
# IPv6 addresses.
#
class TCPSocket
def initialize(host, port, dns_timeout = nil, connect_timeout = nil, blocking = false, family = Socket::Family::UNSPEC)
Addrinfo.tcp(host, port, timeout: dns_timeout, family: family) do |addrinfo|
super(addrinfo.family, addrinfo.type, addrinfo.protocol, blocking)
connect(addrinfo, timeout: connect_timeout) do |error|
close
error
end
end
end
end

# :ditto:
class HTTP::Client
property family : Socket::Family = Socket::Family::UNSPEC

private def io
io = @io
return io if io
unless @reconnect
raise "This HTTP::Client cannot be reconnected"
end

hostname = @host.starts_with?('[') && @host.ends_with?(']') ? @host[1..-2] : @host
io = TCPSocket.new hostname, @port, @dns_timeout, @connect_timeout, family: @family
io.read_timeout = @read_timeout if @read_timeout
io.write_timeout = @write_timeout if @write_timeout
io.sync = false

{% if !flag?(:without_openssl) %}
if tls = @tls
tcp_socket = io
begin
io = OpenSSL::SSL::Socket::Client.new(tcp_socket, context: tls, sync_close: true, hostname: @host.rchop('.'))
rescue exc
# don't leak the TCP socket when the SSL connection failed
tcp_socket.close
raise exc
end
end
{% end %}

@io = io
end
end

# Mute the ClientError exception raised when a connection is flushed.
# This happends when the connection is unexpectedly closed by the client.
#
Expand Down
Loading