Skip to content

Commit b92ef6e

Browse files
committed
add busy tracking to prevent multiple coroutines from using the socket at the same time
1 parent 6df2ad9 commit b92ef6e

File tree

2 files changed

+86
-14
lines changed

2 files changed

+86
-14
lines changed

pgmoon/init.lua

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,10 @@ do
245245
return self:set_type_deserializer(tonumber(res.oid), "hstore")
246246
end,
247247
connect = function(self)
248+
if self.busy then
249+
error("pgmoon: connection is busy")
250+
end
251+
self.busy = true
248252
local connect_opts
249253
local _exp_0 = self.sock_type
250254
if "nginx" == _exp_0 then
@@ -256,41 +260,55 @@ do
256260
end
257261
local ok, err = self.sock:connect(self.config.host, self.config.port, connect_opts)
258262
if not (ok) then
263+
self.busy = false
259264
return nil, err
260265
end
261266
if self.sock:getreusedtimes() == 0 then
262267
if self.config.ssl then
263268
local success
264269
success, err = self:send_ssl_message()
265270
if not (success) then
271+
self.busy = false
266272
return nil, err
267273
end
268274
end
269275
local success
270276
success, err = self:send_startup_message()
271277
if not (success) then
278+
self.busy = false
272279
return nil, err
273280
end
274281
success, err = self:auth()
275282
if not (success) then
283+
self.busy = false
276284
return nil, err
277285
end
278286
success, err = self:wait_until_ready()
279287
if not (success) then
288+
self.busy = false
280289
return nil, err
281290
end
282291
end
292+
self.busy = false
283293
return true
284294
end,
285295
settimeout = function(self, ...)
286296
return self.sock:settimeout(...)
287297
end,
288298
disconnect = function(self)
299+
if self.busy then
300+
error("pgmoon: connection is busy")
301+
end
302+
self.busy = true
289303
self:send_message(MSG_TYPE_F.terminate, { })
290-
return self.sock:close()
304+
return self:unbusy(self.sock:close())
291305
end,
292306
keepalive = function(self, ...)
293-
return self.sock:setkeepalive(...)
307+
if self.busy then
308+
error("pgmoon: connection is busy")
309+
end
310+
self.busy = true
311+
return self:unbusy(self.sock:setkeepalive(...))
294312
end,
295313
create_cqueues_openssl_context = function(self)
296314
if not (self.config.ssl_verify ~= nil or self.config.cert or self.config.key or self.config.ssl_version) then
@@ -561,15 +579,23 @@ do
561579
return self:simple_query(q)
562580
end
563581
end,
582+
unbusy = function(self, ...)
583+
self.busy = false
584+
return ...
585+
end,
564586
simple_query = function(self, q)
565587
if q:find(NULL) then
566588
return nil, "invalid null byte in query"
567589
end
590+
if self.busy then
591+
error("pgmoon: connection is busy")
592+
end
593+
self.busy = true
568594
self:send_message(MSG_TYPE_F.query, {
569595
q,
570596
NULL
571597
})
572-
return self:receive_query_result()
598+
return self:unbusy(self:receive_query_result())
573599
end,
574600
extended_query = function(self, q, ...)
575601
if q:find(NULL) then
@@ -618,6 +644,10 @@ do
618644
end
619645
end
620646
insert(bind_data, self:encode_int(0, 2))
647+
if self.busy then
648+
error("pgmoon: connection is busy")
649+
end
650+
self.busy = true
621651
self:send_messages({
622652
{
623653
MSG_TYPE_F.parse,
@@ -653,7 +683,7 @@ do
653683
{ }
654684
}
655685
})
656-
return self:receive_query_result()
686+
return self:unbusy(self:receive_query_result())
657687
end,
658688
receive_query_result = function(self)
659689
local row_desc, data_rows, command_complete, err_msg
@@ -715,13 +745,19 @@ do
715745
return result, num_queries, notifications, notices
716746
end,
717747
wait_for_notification = function(self)
748+
if self.busy then
749+
error("pgmoon: connection is busy")
750+
end
751+
self.busy = true
718752
while true do
719753
local t, msg = self:receive_message()
720754
if not (t) then
755+
self.busy = false
721756
return nil, msg
722757
end
723758
local _exp_0 = t
724759
if MSG_TYPE_B.notification == _exp_0 then
760+
self.busy = false
725761
return self:parse_notification(msg)
726762
end
727763
end
@@ -1097,6 +1133,7 @@ do
10971133
})
10981134
self.convert_null = self.config.convert_null
10991135
self.sock, self.sock_type = socket.new(self.config.socket_type)
1136+
self.busy = false
11001137
end,
11011138
__base = _base_0,
11021139
__name = "Postgres"

pgmoon/init.moon

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -249,8 +249,12 @@ class Postgres
249249

250250
@convert_null = @config.convert_null
251251
@sock, @sock_type = socket.new @config.socket_type
252+
@busy = false
252253

253254
connect: =>
255+
error "pgmoon: connection is busy" if @busy
256+
@busy = true
257+
254258
connect_opts = switch @sock_type
255259
when "nginx"
256260
{
@@ -260,33 +264,48 @@ class Postgres
260264
}
261265

262266
ok, err = @sock\connect @config.host, @config.port, connect_opts
263-
return nil, err unless ok
267+
unless ok
268+
@busy = false
269+
return nil, err
264270

265271
if @sock\getreusedtimes! == 0
266272
if @config.ssl
267273
success, err = @send_ssl_message!
268-
return nil, err unless success
274+
unless success
275+
@busy = false
276+
return nil, err
269277

270278
success, err = @send_startup_message!
271-
return nil, err unless success
279+
unless success
280+
@busy = false
281+
return nil, err
272282

273283
success, err = @auth!
274-
return nil, err unless success
284+
unless success
285+
@busy = false
286+
return nil, err
275287

276288
success, err = @wait_until_ready!
277-
return nil, err unless success
289+
unless success
290+
@busy = false
291+
return nil, err
278292

293+
@busy = false
279294
true
280295

281296
settimeout: (...) =>
282297
@sock\settimeout ...
283298

284299
disconnect: =>
300+
error "pgmoon: connection is busy" if @busy
301+
@busy = true
285302
@send_message MSG_TYPE_F.terminate, {}
286-
@sock\close!
303+
@unbusy @sock\close!
287304

288305
keepalive: (...) =>
289-
@sock\setkeepalive ...
306+
error "pgmoon: connection is busy" if @busy
307+
@busy = true
308+
@unbusy @sock\setkeepalive ...
290309

291310
-- see: http://25thandclement.com/~william/projects/luaossl.pdf
292311
create_cqueues_openssl_context: =>
@@ -566,14 +585,21 @@ class Postgres
566585
@simple_query q
567586

568587

588+
unbusy: (...) =>
589+
@busy = false
590+
...
591+
569592
-- query using the "simple" query protocol
570593
-- supports multiple queries, but no parameters
571594
simple_query: (q) =>
572595
if q\find NULL
573596
return nil, "invalid null byte in query"
574597

598+
error "pgmoon: connection is busy" if @busy
599+
@busy = true
600+
575601
@send_message MSG_TYPE_F.query, {q, NULL}
576-
@receive_query_result!
602+
@unbusy @receive_query_result!
577603

578604
-- query using the "extended" query protocol
579605
-- supports only a single query, and parameters
@@ -629,6 +655,9 @@ class Postgres
629655

630656
insert bind_data, @encode_int 0, 2 -- number of result format codes, 0 to default to all text
631657

658+
error "pgmoon: connection is busy" if @busy
659+
@busy = true
660+
632661
@send_messages {
633662
{ MSG_TYPE_F.parse, parse_data }
634663
{ MSG_TYPE_F.bind, bind_data }
@@ -659,7 +688,7 @@ class Postgres
659688
}
660689
}
661690

662-
@receive_query_result!
691+
@unbusy @receive_query_result!
663692

664693
-- NOTE: this is called for both the simple query and the extended query protocol
665694
receive_query_result: =>
@@ -714,11 +743,17 @@ class Postgres
714743
result, num_queries, notifications, notices
715744

716745
wait_for_notification: =>
746+
error "pgmoon: connection is busy" if @busy
747+
@busy = true
748+
717749
while true
718750
t, msg = @receive_message!
719-
return nil, msg unless t
751+
unless t
752+
@busy = false
753+
return nil, msg
720754
switch t
721755
when MSG_TYPE_B.notification
756+
@busy = false
722757
return @parse_notification(msg)
723758

724759
format_query_result: (row_desc, data_rows, command_complete) =>

0 commit comments

Comments
 (0)