Skip to content

Commit 9e2c7f1

Browse files
committed
fix(nonblocking): apply suggestions from code review
1 parent 385b681 commit 9e2c7f1

File tree

2 files changed

+50
-25
lines changed

2 files changed

+50
-25
lines changed

src/asyncresults.jl

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ function _consume(jl_conn::Connection)
8484
# this is important?
8585
# https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266
8686
# if we used non-blocking connections we would need to check for `1` as well
87-
# See flush(jl_conn::Connection) in connections.jl
88-
flush(jl_conn)
87+
# See _flush(jl_conn::Connection) in connections.jl
88+
if !_flush(jl_conn)
89+
error(LOGGER, Errors.PQConnectionError(jl_conn))
90+
end
8991

9092
async_result = jl_conn.async_result
9193
result_ptrs = Ptr{libpq_c.PGresult}[]
@@ -291,10 +293,10 @@ end
291293

292294
function _async_submit(jl_conn::Connection, query::AbstractString)
293295
send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query)
294-
if isnonblocking(jl_conn) == 0
296+
if isnonblocking(jl_conn)
295297
return send_status == 1
296298
else
297-
return flush(jl_conn)
299+
return _flush(jl_conn)
298300
end
299301
end
300302

@@ -317,6 +319,11 @@ function _async_submit(
317319
zeros(Cint, num_params), # all parameters in text format
318320
Cint(binary_format), # return result in text or binary format
319321
)
320-
# send_status must be 1, if nonblock, we also want to flush
321-
return send_status == 1 && (isnonblocking(jl_conn) == 0 || flush(jl_conn))
322+
# send_status must be 1
323+
# if nonblock, we also want to _flush
324+
if isnonblocking(jl_conn)
325+
return send_status == 1 && _flush(jl_conn)
326+
else
327+
return send_status == 1
328+
end
322329
end

src/connections.jl

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ function Connection(
267267
throw_error::Bool=true,
268268
connect_timeout::Real=0,
269269
options::Dict{String, String}=CONNECTION_OPTION_DEFAULTS,
270+
nonblocking::Bool=false,
270271
kwargs...
271272
)
272273
if options === CONNECTION_OPTION_DEFAULTS
@@ -300,7 +301,7 @@ function Connection(
300301
)
301302

302303
# If password needed and not entered, prompt the user
303-
if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1
304+
connection = if libpq_c.PQconnectionNeedsPassword(jl_conn.conn) == 1
304305
push!(keywords, "password")
305306
user = unsafe_string(libpq_c.PQuser(jl_conn.conn))
306307
# close this connection; will open another one below with the user-provided password
@@ -309,19 +310,28 @@ function Connection(
309310
pass = Base.getpass(prompt)
310311
push!(values, read(pass, String))
311312
Base.shred!(pass)
312-
return handle_new_connection(
313+
handle_new_connection(
313314
Connection(
314315
_connect_nonblocking(keywords, values, false; timeout=connect_timeout);
315316
kwargs...
316317
);
317318
throw_error=throw_error,
318319
)
319320
else
320-
return handle_new_connection(
321+
handle_new_connection(
321322
jl_conn;
322323
throw_error=throw_error,
323324
)
324325
end
326+
327+
if nonblocking
328+
success = libpq_c.PQsetnonblocking(connection.conn, convert(Cint, nonblock)) == 0
329+
if !success
330+
close(connection)
331+
error(LOGGER, "Could not provide a non-blocking connection")
332+
end
333+
end
334+
return connection
325335
end
326336

327337
# AbstractLock primitives:
@@ -791,9 +801,11 @@ end
791801
socket(jl_conn::Connection) = socket(jl_conn.conn)
792802

793803
"""
804+
isnonblocking(jl_conn::Connection)
805+
794806
Sets the nonblocking connection status of the PG connections.
795807
While async_execute is non-blocking on the receiving side,
796-
the sending side is still nonblockign without this
808+
the sending side is still nonblocking without this
797809
Returns true on success, false on failure
798810
799811
https://www.postgresql.org/docs/current/libpq-async.html
@@ -808,28 +820,37 @@ Returns true if the connection is set to non-blocking, false otherwise
808820
809821
https://www.postgresql.org/docs/current/libpq-async.html
810822
"""
811-
function isnonblocking(jl_conn)
823+
function isnonblocking(jl_conn::Connection)
812824
return libpq_c.PQisnonblocking(jl_conn.conn) == 1
813825
end
814826

815827
"""
816-
Do the flush dance described in the libpq docs. Required when the
828+
_flush(jl_conn::Connection)
829+
830+
Do the _flush dance described in the libpq docs. Required when the
817831
connections are set to nonblocking and we want do send queries/data
818832
without blocking.
819833
820-
https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFlush
834+
https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFLUSH
821835
"""
822-
function flush(jl_conn)
823-
watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes
836+
function _flush(jl_conn::Connection)
837+
local watcher = nothing
838+
if isnonblocking(jl_conn)
839+
watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes
840+
end
824841
try
825-
while true # Iterators.repeated(true) # would make me more comfotable I think
842+
while true
826843
flushstatus = libpq_c.PQflush(jl_conn.conn)
827844
# 0 indicates success
828-
flushstatus == 0 && return true
845+
if flushstatus == 0
846+
return true
829847
# -1 indicates error
830-
flushstatus < 0 && error(LOGGER, Errors.PQConnectionError(jl_conn))
831-
# Could not send all data without blocking, need to wait FD
832-
flushstatus == 1 && begin
848+
elseif flushstatus < 0
849+
return false
850+
# 1 indicates that we could not send all data without blocking,
851+
elseif flushstatus == 1
852+
# need to wait FD
853+
# Only applicable when the connection is in nonblocking mode
833854
wait(watcher) # Wait for the watcher
834855
# If it becomes write-ready, call PQflush again.
835856
if watcher.mask.writable
@@ -838,15 +859,12 @@ function flush(jl_conn)
838859
if watcher.mask.readable
839860
# if the stream is readable, we have to consume data from the server first.
840861
success = libpq_c.PQconsumeInput(jl_conn.conn) == 1
841-
!success && error(LOGGER, Errors.PQConnectionError(jl_conn))
862+
!success && return false
842863
end
843864
end
844865
end
845-
catch
846-
# We don't want to manage anything here
847-
rethrow()
848866
finally
849867
# Just close the watcher
850-
close(watcher)
868+
!isnothing(watcher) && close(watcher)
851869
end
852870
end

0 commit comments

Comments
 (0)