Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(starvation) prevent high-throughput reading from starving #142

Merged
merged 1 commit into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ test: certs
$(LUA) $(DELIM) $(PKGPATH) tests/request.lua 'https://www.google.nl' true
$(LUA) $(DELIM) $(PKGPATH) tests/semaphore.lua
$(LUA) $(DELIM) $(PKGPATH) tests/sleep.lua
$(LUA) $(DELIM) $(PKGPATH) tests/starve.lua
$(LUA) $(DELIM) $(PKGPATH) tests/tcptimeout.lua
$(LUA) $(DELIM) $(PKGPATH) tests/timer.lua
$(LUA) $(DELIM) $(PKGPATH) tests/tls-sni.lua
Expand Down
3 changes: 2 additions & 1 deletion docs/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ <h2><a name="history"></a>History</h2>
non-closed sockets. This could prevent the loop from exiting.</li>
<li>Fix: in debug mode very large data is now truncated when displayed.</li>
<li>Fix: order of <code>copas.addnamedthread</code> args.</li>
</ul></dd>
<li>Fix: the receive methods could starve other threads on high-throughput.</li>
</ul></dd>

<dt><strong>Copas 4.2.0</strong> [06/Sep/2022]</dt>
<dd><ul>
Expand Down
25 changes: 17 additions & 8 deletions src/copas.lua
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,11 @@ function copas.receive(client, pattern, part)
repeat
s, err, part = client:receive(pattern, part)

-- guarantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
copas.sleep(0)
end

if s then
current_log[client] = nil
sto_timeout()
Expand Down Expand Up @@ -517,6 +522,11 @@ function copas.receivefrom(client, size)
repeat
s, err, port = client:receivefrom(size) -- upon success err holds ip address

-- garantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
copas.sleep(0)
end

if s then
_reading_log[client] = nil
sto_timeout()
Expand Down Expand Up @@ -548,6 +558,11 @@ function copas.receivePartial(client, pattern, part)
repeat
s, err, part = client:receive(pattern, part)

-- guarantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
copas.sleep(0)
end

if s or (type(pattern) == "number" and part ~= "" and part ~= nil) then
current_log[client] = nil
sto_timeout()
Expand Down Expand Up @@ -590,15 +605,9 @@ function copas.send(client, data, from, to)
repeat
s, err, lastIndex = client:send(data, lastIndex + 1, to)

-- adds extra coroutine swap
-- garantees that high throughput doesn't take other threads to starvation
-- guarantees that high throughput doesn't take other threads to starvation
if (math.random(100) > 90) then
current_log[client] = gettime() -- TODO: how to handle this??
if current_log == _writing_log then
coroutine_yield(client, _writing)
else
coroutine_yield(client, _reading)
end
copas.sleep(0)
end

if s then
Expand Down
87 changes: 87 additions & 0 deletions tests/starve.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
-- tests looping 100% in receive/send
-- Should not prevent other threads from running
--
-- Test should;
-- * sleep incremental, not on absolute time so it slowly diverges if the timer
-- thread is being starved
-- * seconds printed and elapsed should stay very close

local copas = require 'copas'
local socket = require 'socket'

--copas.debug.start()

local body = ("A"):rep(1024*1024*50) -- 50 mb string
local done = 0

local function runtest()
local s1 = socket.bind('*', 49500)
copas.addserver(s1, copas.handler(function(skt)
copas.setsocketname("Server 49500", skt)
copas.setthreadname("Server 49500")
print "Server 49500 accepted incoming connection"
local end_time = socket.gettime() + 30 -- we run for 30 seconds
while end_time > socket.gettime() do
local res, err, _ = skt:receive(1) -- single byte from 50mb chunks
if res == nil and err ~= "timeout" then
print("Server 49500 returned: " .. err)
os.exit(1)
end
end
done = done + 1
print("Server reading port 49500... Done!")
skt:close()
copas.removeserver(s1)
end))

copas.addnamedthread("Client 49500", function()
local skt = socket.tcp()
skt = copas.wrap(skt)
copas.setsocketname("Client 49500", skt)
skt:connect("localhost", 49500)
local last_byte_sent, err, complete
while not complete do
repeat
last_byte_sent, err = skt:send(body, last_byte_sent or 1, -1)
if err == "closed" then
-- server closed connection, so exit, test is finished
complete = true
break
end
if last_byte_sent == nil and err ~= "timeout" then
print("client 49500 returned: " .. err)
os.exit(1)
end
until last_byte_sent == nil or last_byte_sent == #body
end
print("Client writing port 49500... Done!")
skt:close()
done = done + 1
end)

copas.addnamedthread("test timeout thread", function()
local i = 0
local start = socket.gettime()
while done ~= 2 do
copas.sleep(1) -- delta sleep, so it slowly diverges if starved
i = i + 1
local time_passed = socket.gettime()-start
print("slept "..i.." seconds, time passed: ".. time_passed.." seconds")
if math.abs(i - time_passed) > 2 then
print("timer diverged by more than 2 seconds: failed!")
os.exit(1)
end
if i > 60 then
print"timeout"
os.exit(1)
end
end
print "success!"
end)

print("starting loop")
copas.loop()
print("Loop done")
end

runtest()