Skip to content

Introduces Connection pool in compatible adapters #1006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions faraday.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Gem::Specification.new do |spec|

spec.required_ruby_version = '>= 2.3'

spec.add_dependency 'connection_pool', '~> 2.2'
spec.add_dependency 'multipart-post', '>= 1.2', '< 3'

spec.require_paths = %w[lib spec/external_adapters]
Expand Down
6 changes: 6 additions & 0 deletions lib/faraday/adapter.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require 'faraday/adapter/concerns/connection_pooling'

module Faraday
# Base class for all Faraday adapters. Adapters are
# responsible for fulfilling a Faraday request.
Expand Down Expand Up @@ -40,10 +42,14 @@ def inherited(subclass)
extend Parallelism
self.supports_parallel = false

include ConnectionPooling
self.supports_pooling = false

def initialize(_app = nil, opts = {}, &block)
@app = ->(env) { env.response }
@connection_options = opts
@config_block = block
initialize_pool(opts[:pool] || {}) if self.class.supports_pooling
end

def call(env)
Expand Down
38 changes: 38 additions & 0 deletions lib/faraday/adapter/concerns/connection_pooling.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# frozen_string_literal: true

# This module marks an Adapter as supporting connection pooling.
module ConnectionPooling
def self.included(base)
base.extend(ClassMethods)
end

# Class methods injected into the class including this module.
module ClassMethods
attr_accessor :supports_pooling

def inherited(subclass)
super
subclass.supports_pooling = supports_pooling
end
end

attr_reader :pool

MISSING_CONNECTION_ERROR = 'You need to define a `connection` method' \
'in order to support connection pooling!'

# Initializes the connection pool.
#
# @param opts [Hash] the options to pass to `ConnectionPool` initializer.
def initialize_pool(opts = {})
ensure_connection_defined!
@pool = ConnectionPool.new(opts, &method(:connection))
end

# Checks if `connection` method exists and raises an error otherwise.
def ensure_connection_defined!
return if self.class.method_defined?(:connection)

raise NoMethodError, MISSING_CONNECTION_ERROR
end
end
103 changes: 52 additions & 51 deletions lib/faraday/adapter/httpclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,19 @@ class Adapter
class HTTPClient < Faraday::Adapter
dependency 'httpclient'

# @return [HTTPClient]
def client
@client ||= ::HTTPClient.new
self.supports_pooling = true

def connection
::HTTPClient.new
end

def call(env)
super

# enable compression
client.transparent_gzip_decompression = true

if (req = env[:request])
if (proxy = req[:proxy])
configure_proxy proxy
end

if (bind = req[:bind])
configure_socket bind
end

configure_timeouts req
end

if env[:url].scheme == 'https' && (ssl = env[:ssl])
configure_ssl ssl
resp = pool.with do |client|
perform_request(client, env)
end

configure_client

# TODO: Don't stream yet.
# https://github.com/nahi/httpclient/pull/90
env[:body] = env[:body].read if env[:body].respond_to? :read

resp = client.request env[:method], env[:url],
body: env[:body],
header: env[:request_headers]

if (req = env[:request]).stream_response?
warn "Streaming downloads for #{self.class.name} " \
'are not yet implemented.'
Expand Down Expand Up @@ -70,54 +46,79 @@ def call(env)
raise
end

def perform_request(client, env)
# enable compression
client.transparent_gzip_decompression = true

if (req = env[:request])
configure_proxy(client, req[:proxy]) if req[:proxy]
configure_socket(client, req[:bind]) if req[:bind]
configure_timeouts(client, req)
end

if env[:url].scheme == 'https' && (ssl = env[:ssl])
configure_ssl(client, ssl)
end

configure_client(client)

# TODO: Don't stream yet.
# https://github.com/nahi/httpclient/pull/90
env[:body] = env[:body].read if env[:body].respond_to? :read

client.request env[:method], env[:url],
body: env[:body],
header: env[:request_headers]
end

def configure_client(client)
@config_block&.call(client)
end

# @param bind [Hash]
def configure_socket(bind)
def configure_socket(client, bind)
client.socket_local.host = bind[:host]
client.socket_local.port = bind[:port]
end

# Configure proxy URI and any user credentials.
#
# @param proxy [Hash]
def configure_proxy(proxy)
def configure_proxy(client, proxy)
client.proxy = proxy[:uri]
return unless proxy[:user] && proxy[:password]

client.set_proxy_auth(proxy[:user], proxy[:password])
end

# @param ssl [Hash]
def configure_ssl(ssl)
def configure_ssl(client, ssl)
ssl_config = client.ssl_config
ssl_config.verify_mode = ssl_verify_mode(ssl)
ssl_config.cert_store = ssl_cert_store(ssl)

ssl_config.add_trust_ca ssl[:ca_file] if ssl[:ca_file]
ssl_config.add_trust_ca ssl[:ca_path] if ssl[:ca_path]
ssl_config.client_cert = ssl[:client_cert] if ssl[:client_cert]
ssl_config.client_key = ssl[:client_key] if ssl[:client_key]
ssl_config.add_trust_ca ssl[:ca_file] if ssl[:ca_file]
ssl_config.add_trust_ca ssl[:ca_path] if ssl[:ca_path]
ssl_config.client_cert = ssl[:client_cert] if ssl[:client_cert]
ssl_config.client_key = ssl[:client_key] if ssl[:client_key]
ssl_config.verify_depth = ssl[:verify_depth] if ssl[:verify_depth]
end

# @param req [Hash]
def configure_timeouts(req)
configure_timeout(req) if req[:timeout]
configure_open_timeout(req) if req[:open_timeout]
def configure_timeouts(client, req)
configure_timeout(client, req) if req[:timeout]
configure_open_timeout(client, req) if req[:open_timeout]
end

def configure_timeout(req)
client.connect_timeout = req[:timeout]
client.receive_timeout = req[:timeout]
client.send_timeout = req[:timeout]
def configure_timeout(client, req)
client.connect_timeout = req[:timeout]
client.receive_timeout = req[:timeout]
client.send_timeout = req[:timeout]
end

def configure_open_timeout(req)
client.connect_timeout = req[:open_timeout]
client.send_timeout = req[:open_timeout]
end

def configure_client
@config_block&.call(client)
def configure_open_timeout(client, req)
client.connect_timeout = req[:open_timeout]
client.send_timeout = req[:open_timeout]
end

# @param ssl [Hash]
Expand Down
66 changes: 38 additions & 28 deletions lib/faraday/adapter/patron.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,19 @@ class Adapter
class Patron < Faraday::Adapter
dependency 'patron'

self.supports_pooling = true
def connection
::Patron::Session.new
end

def call(env)
super
# TODO: support streaming requests
env[:body] = env[:body].read if env[:body].respond_to? :read

session = ::Patron::Session.new
@config_block&.call(session)
if (env[:url].scheme == 'https') && env[:ssl]
configure_ssl(session, env[:ssl])
end

if (req = env[:request])
if req[:timeout]
session.timeout = session.connect_timeout = req[:timeout]
end
session.connect_timeout = req[:open_timeout] if req[:open_timeout]

if (proxy = req[:proxy])
proxy_uri = proxy[:uri].dup
proxy_uri.user = proxy[:user] &&
Utils.escape(proxy[:user]).gsub('+', '%20')
proxy_uri.password = proxy[:password] &&
Utils.escape(proxy[:password]).gsub('+', '%20')
session.proxy = proxy_uri.to_s
end
end

response = begin
data = env[:body] ? env[:body].to_s : nil
session.request(env[:method], env[:url].to_s,
env[:request_headers], data: data)
rescue Errno::ECONNREFUSED, ::Patron::ConnectionFailed
raise Faraday::ConnectionFailed, $ERROR_INFO
response = pool.with do |session|
@config_block&.call(session)
perform_request(session, env)
end

if (req = env[:request]).stream_response?
Expand Down Expand Up @@ -83,6 +63,36 @@ def call(env)
end
end

def perform_request(session, env)
if (env[:url].scheme == 'https') && env[:ssl]
configure_ssl(session, env[:ssl])
end

if (req = env[:request])
if req[:timeout]
session.timeout = session.connect_timeout = req[:timeout]
end
session.connect_timeout = req[:open_timeout] if req[:open_timeout]

if (proxy = req[:proxy])
proxy_uri = proxy[:uri].dup
proxy_uri.user = proxy[:user] &&
Utils.escape(proxy[:user]).gsub('+', '%20')
proxy_uri.password = proxy[:password] &&
Utils.escape(proxy[:password]).gsub('+', '%20')
session.proxy = proxy_uri.to_s
end
end

begin
data = env[:body] ? env[:body].to_s : nil
session.request(env[:method], env[:url].to_s,
env[:request_headers], data: data)
rescue Errno::ECONNREFUSED, ::Patron::ConnectionFailed
raise Faraday::ConnectionFailed, $ERROR_INFO
end
end

def configure_ssl(session, ssl)
if ssl.fetch(:verify, true)
session.cacert = ssl[:ca_file]
Expand Down
11 changes: 6 additions & 5 deletions spec/faraday/adapter/httpclient_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

RSpec.describe Faraday::Adapter::HTTPClient do
features :request_body_on_query_methods, :reason_phrase_parse, :compression,
:trace_method, :connect_method, :local_socket_binding
:trace_method, :connect_method, :local_socket_binding, :pooling

it_behaves_like 'an adapter'

Expand All @@ -12,10 +12,11 @@
client.ssl_config.timeout = 25
end

client = adapter.client
adapter.configure_client
adapter.pool.with do |client|
adapter.configure_client(client)

expect(client.keep_alive_timeout).to eq(20)
expect(client.ssl_config.timeout).to eq(25)
expect(client.keep_alive_timeout).to eq(20)
expect(client.ssl_config.timeout).to eq(25)
end
end
end
2 changes: 1 addition & 1 deletion spec/faraday/adapter/patron_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

RSpec.describe Faraday::Adapter::Patron do
features :reason_phrase_parse
features :reason_phrase_parse, :pooling

it_behaves_like 'an adapter'

Expand Down
31 changes: 31 additions & 0 deletions spec/support/shared_examples/request_method.rb
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,39 @@

it 'handles proxy failures' do
conn_options[:proxy] = 'http://google.co.uk'

request_stub.to_return(status: 407)

expect { conn.public_send(http_method, '/') }.to raise_error(Faraday::ProxyAuthError)
end

on_feature :pooling do
context 'accessing the pool' do
before do
@pool = nil
allow_any_instance_of(described_class).to receive(:pool).and_wrap_original do |m, *args|
@pool ||= m.call(*args)
end

request_stub.disable
end

it 'uses a connection_pool internally' do
# Injects expectation on request execution
request_stub.to_return do |_|
expect(@pool.available).to eq(@pool.size - 1)
{ body: '' }
end

conn.public_send(http_method, '/')
end

it 'passes pool options to the connection pool' do
adapter_options << { pool: { size: 3 } }

conn.public_send(http_method, '/')
expect(@pool.size).to eq(3)
end
end
end
end