Skip to content

Commit 3dfeaa0

Browse files
committed
http/client: WIP connection pooling
1 parent 9bb493c commit 3dfeaa0

File tree

4 files changed

+174
-2
lines changed

4 files changed

+174
-2
lines changed

http-scm-0.rockspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ build = {
3030
modules = {
3131
["http.bit"] = "http/bit.lua";
3232
["http.client"] = "http/client.lua";
33+
["http.client_pool"] = "http/client_pool.lua";
3334
["http.connection_common"] = "http/connection_common.lua";
3435
["http.cookie"] = "http/cookie.lua";
3536
["http.h1_connection"] = "http/h1_connection.lua";

http/client.lua

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ local cqueues_dns = require "cqueues.dns"
66
local cqueues_dns_record = require "cqueues.dns.record"
77
local http_tls = require "http.tls"
88
local http_util = require "http.util"
9+
local http_client_pool = require "http.client_pool"
910
local connection_common = require "http.connection_common"
1011
local onerror = connection_common.onerror
1112
local new_h1_connection = require "http.h1_connection".new
@@ -166,6 +167,9 @@ local record_ipv4_mt = {
166167
__name = "http.client.record.ipv4";
167168
__index = record_ipv4_methods;
168169
}
170+
function record_ipv4_methods:pool_key()
171+
return http_client_pool.ipv4_pool_key(self.addr, self.port)
172+
end
169173
function records_methods:add_v4(addr, port)
170174
local n = self.n + 1
171175
self[n] = setmetatable({ addr = addr, port = port }, record_ipv4_mt)
@@ -179,6 +183,9 @@ local record_ipv6_mt = {
179183
__name = "http.client.record.ipv6";
180184
__index = record_ipv6_methods;
181185
}
186+
function record_ipv6_methods:pool_key()
187+
return http_client_pool.ipv6_pool_key(self.addr, self.port)
188+
end
182189
function records_methods:add_v6(addr, port)
183190
if type(addr) == "string" then
184191
-- Normalise
@@ -199,6 +206,9 @@ local record_unix_mt = {
199206
__name = "http.client.record.unix";
200207
__index = record_unix_methods;
201208
}
209+
function record_unix_methods:pool_key()
210+
return http_client_pool.unix_pool_key(self.path)
211+
end
202212
function records_methods:add_unix(path)
203213
local n = self.n + 1
204214
self[n] = setmetatable({ path = path }, record_unix_mt)
@@ -278,6 +288,19 @@ local function connect(options, timeout)
278288
return nil, lasterr, lasterrno
279289
end
280290

291+
local pool = options.pool
292+
if pool then
293+
for i=1, records.n do
294+
local dst_pool = pool[records[i]:pool_key()]
295+
if dst_pool then
296+
local c = http_client_pool.find_connection(dst_pool, options)
297+
if c then
298+
return c
299+
end
300+
end
301+
end
302+
end
303+
281304
local bind = options.bind
282305
if bind ~= nil then
283306
assert(type(bind) == "string")
@@ -325,6 +348,9 @@ local function connect(options, timeout)
325348
local ok
326349
ok, lasterr, lasterrno = c:connect(deadline and deadline-monotime())
327350
if ok then
351+
if pool then
352+
pool:add(c)
353+
end
328354
return c
329355
end
330356
c:close()

http/client_pool.lua

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
local cs = require "cqueues.socket"
2+
3+
local function reuse_connection(candidate, connect_options)
4+
-- Assume family/host/port/path already checked
5+
6+
if candidate.socket == nil then
7+
return false
8+
end
9+
10+
if connect_options.v6only then
11+
-- TODO
12+
return false
13+
end
14+
15+
local bind = connect_options.bind
16+
if bind then
17+
-- TODO: Use :localname()
18+
return false
19+
end
20+
21+
local version = connect_options.version
22+
if version and version ~= candidate.version then
23+
return false
24+
end
25+
26+
if candidate.version < 2 then
27+
-- Check if connection already in use (avoid pipelining)
28+
if candidate.req_locked then
29+
return false
30+
end
31+
elseif candidate.version == 2 then
32+
-- Check if http2 connection is nearing end of stream ids
33+
local highest_stream_id = math.max(candidate.highest_odd_stream, candidate.highest_even_stream)
34+
-- The stream id is a unsigned 31bit integer. we don't reuse if it's past half way
35+
if highest_stream_id > 0x3fffffff then
36+
return false
37+
end
38+
39+
local h2_settings = connect_options.h2_settings
40+
if h2_settings then
41+
-- TODO: check (and possibly change on connection?)
42+
return false
43+
end
44+
end
45+
46+
-- Do TLS check last, as it is the most expensive
47+
if connect_options.tls then
48+
-- TODO: compare TLS parameters
49+
return false
50+
end
51+
52+
-- Check to see if connection has been closed
53+
local ok, err = candidate.socket:fill(1, 0)
54+
if not ok and err == nil then
55+
-- has been closed
56+
return false
57+
end
58+
59+
return true
60+
end
61+
62+
local pool_methods = {}
63+
local pool_mt = {
64+
__name = "http.client.pool";
65+
__index = pool_methods;
66+
}
67+
68+
local function new_pool()
69+
return setmetatable({}, pool_mt)
70+
end
71+
72+
local function ipv4_pool_key(addr, port)
73+
return string.format("%d:%s:%s", cs.AF_INET, addr, port)
74+
end
75+
76+
local function ipv6_pool_key(addr, port)
77+
return string.format("%d:[%s]:%s", cs.AF_INET6, addr, port)
78+
end
79+
80+
local function unix_pool_key(path)
81+
return string.format("%d:%s", cs.AF_UNIX, path)
82+
end
83+
84+
local function connection_pool_key(connection)
85+
-- XXX: if using a proxy this may not be correct
86+
local family, a, b = connection:peername()
87+
if family == cs.AF_INET then
88+
return ipv4_pool_key(a, b)
89+
elseif family == cs.AF_INET6 then
90+
return ipv6_pool_key(a, b)
91+
elseif family == cs.AF_UNIX then
92+
return unix_pool_key(a)
93+
end
94+
end
95+
96+
function pool_methods:add(connection)
97+
local key = connection_pool_key(connection)
98+
if not key then
99+
return false
100+
end
101+
local dst_pool = self[key]
102+
if dst_pool == nil then
103+
dst_pool = {}
104+
self[key] = dst_pool
105+
end
106+
dst_pool[connection] = true
107+
return true
108+
end
109+
110+
function pool_methods:remove(connection)
111+
local key = connection_pool_key(connection)
112+
if not key then
113+
return true
114+
end
115+
local dst_pool = self[key]
116+
if dst_pool == nil then
117+
return true
118+
end
119+
dst_pool[connection] = nil
120+
if next(dst_pool) == nil then
121+
self[key] = nil
122+
end
123+
return true
124+
end
125+
126+
local function find_connection(dst_pool, connect_options)
127+
for connection in pairs(dst_pool) do
128+
if reuse_connection(connection, connect_options) then
129+
return connection
130+
end
131+
end
132+
return nil
133+
end
134+
135+
return {
136+
ipv4_pool_key = ipv4_pool_key;
137+
ipv6_pool_key = ipv6_pool_key;
138+
unix_pool_key = unix_pool_key;
139+
140+
new = new_pool;
141+
find_connection = find_connection;
142+
}

http/request.lua

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local uri_patts = require "lpeg_patterns.uri"
44
local basexx = require "basexx"
55
local client = require "http.client"
66
local new_headers = require "http.headers".new
7+
local http_client_pool = require "http.client_pool"
78
local http_cookie = require "http.cookie"
89
local http_hsts = require "http.hsts"
910
local http_socks = require "http.socks"
@@ -17,6 +18,7 @@ local default_user_agent = string.format("%s/%s", http_version.name, http_versio
1718
local default_hsts_store = http_hsts.new_store()
1819
local default_proxies = http_proxies.new():update()
1920
local default_cookie_store = http_cookie.new_store()
21+
local default_connection_pool = http_client_pool.new()
2022

2123
local default_h2_settings = {
2224
ENABLE_PUSH = false;
@@ -26,6 +28,7 @@ local request_methods = {
2628
hsts = default_hsts_store;
2729
proxies = default_proxies;
2830
cookie_store = default_cookie_store;
31+
pool = default_connection_pool;
2932
is_top_level = true;
3033
site_for_cookies = nil;
3134
expect_100_timeout = 1;
@@ -125,6 +128,7 @@ function request_methods:clone()
125128
hsts = rawget(self, "hsts");
126129
proxies = rawget(self, "proxies");
127130
cookie_store = rawget(self, "cookie_store");
131+
pool = rawget(self, "pool");
128132
is_top_level = rawget(self, "is_top_level");
129133
site_for_cookies = rawget(self, "site_for_cookies");
130134
expect_100_timeout = rawget(self, "expect_100_timeout");
@@ -490,6 +494,7 @@ function request_methods:go(timeout)
490494
if not connection then
491495
local err, errno
492496
connection, err, errno = client.connect({
497+
pool = self.pool;
493498
host = host;
494499
port = port;
495500
bind = self.bind;
@@ -502,8 +507,6 @@ function request_methods:go(timeout)
502507
if connection == nil then
503508
return nil, err, errno
504509
end
505-
-- Close the connection (and free resources) when done
506-
connection:onidle(connection.close)
507510
end
508511

509512
local stream do

0 commit comments

Comments
 (0)