Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 48 additions & 25 deletions src/pgsql_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

-export([startup/3, auth/2, initializing/2, ready/2, ready/3]).
-export([querying/2, parsing/2, binding/2, describing/2]).
-export([executing/2, closing/2, synchronizing/2, timeout/2]).
-export([aborted/3]).
-export([executing/2, closing/2, synchronizing/2, synchronizing/3]).
-export([timeout/2]).

-include("pgsql.hrl").

Expand All @@ -27,6 +27,8 @@
reply,
reply_to,
async,
autosync=false,
callers=[],
backend,
statement,
txstatus}).
Expand All @@ -46,31 +48,31 @@ connect(C, Host, Username, Password, Opts) ->
gen_fsm:sync_send_event(C, {connect, Host, Username, Password, Opts}, infinity).

get_parameter(C, Name) ->
gen_fsm:sync_send_event(C, {get_parameter, to_binary(Name)}).
sync_send_event(C, {get_parameter, to_binary(Name)}, infinity).

squery(C, Sql) ->
gen_fsm:sync_send_event(C, {squery, Sql}, infinity).
sync_send_event(C, {squery, Sql}, infinity).

equery(C, Statement, Parameters) ->
gen_fsm:sync_send_event(C, {equery, Statement, Parameters}, infinity).
sync_send_event(C, {equery, Statement, Parameters}, infinity).

parse(C, Name, Sql, Types) ->
gen_fsm:sync_send_event(C, {parse, Name, Sql, Types}, infinity).
sync_send_event(C, {parse, Name, Sql, Types}, infinity).

bind(C, Statement, PortalName, Parameters) ->
gen_fsm:sync_send_event(C, {bind, Statement, PortalName, Parameters}, infinity).
sync_send_event(C, {bind, Statement, PortalName, Parameters}, infinity).

execute(C, Statement, PortalName, MaxRows) ->
gen_fsm:sync_send_event(C, {execute, Statement, PortalName, MaxRows}, infinity).
sync_send_event(C, {execute, Statement, PortalName, MaxRows}, infinity).

describe(C, Type, Name) ->
gen_fsm:sync_send_event(C, {describe, Type, Name}, infinity).
sync_send_event(C, {describe, Type, Name}, infinity).

close(C, Type, Name) ->
gen_fsm:sync_send_event(C, {close, Type, Name}, infinity).
sync_send_event(C, {close, Type, Name}, infinity).

sync(C) ->
gen_fsm:sync_send_event(C, sync, infinity).
sync_send_event(C, sync, infinity).

%% -- gen_fsm implementation --

Expand Down Expand Up @@ -295,6 +297,12 @@ ready(sync, From, State) ->
State2 = State#state{reply = ok, reply_to = From},
{next_state, synchronizing, State2, Timeout}.

%% If a caller sends a request while the connection is autosyncing
%% store the caller's pid and tell them to wait.
synchronizing(_Msg, {Pid, _Tag}, #state{callers = Callers, autosync = true} = State) ->
{reply, {wait, synchronizing}, synchronizing, State#state{callers = [Pid|Callers]}}.


%% BindComplete
querying({$2, <<>>}, State) ->
#state{timeout = Timeout, statement = #statement{columns = Columns}} = State,
Expand Down Expand Up @@ -470,7 +478,11 @@ executing(timeout, State) ->
executing({error, E}, State) ->
#state{timeout = Timeout} = State,
notify(State, {error, E}),
{next_state, aborted, State, Timeout}.
%% Send sync command to database and transition immediately
%% to synchronizing state. This automatically handles the
%% case where the driver needs to checkpoint after an error
send(State, $S, []),
{next_state, synchronizing, State#state{autosync = true}, Timeout}.

%% CloseComplete
closing({$3, <<>>}, State) ->
Expand Down Expand Up @@ -500,9 +512,8 @@ synchronizing(timeout, State) ->

%% ReadyForQuery
synchronizing({$Z, <<Status:8>>}, State) ->
#state{reply = Reply, reply_to = Reply_To} = State,
gen_fsm:reply(Reply_To, Reply),
{next_state, ready, State#state{reply = undefined, txstatus = Status}}.
State1 = maybe_reply(State),
{next_state, ready, State1#state{reply = undefined, txstatus = Status, autosync = false}}.

timeout({$Z, <<Status:8>>}, State) ->
notify(State, timeout),
Expand All @@ -516,16 +527,6 @@ timeout(_Event, State) ->
#state{timeout = Timeout} = State,
{next_state, timeout, State, Timeout}.

aborted(sync, From, State) ->
#state{timeout = Timeout} = State,
send(State, $S, []),
State2 = State#state{reply = ok, reply_to = From},
{next_state, synchronizing, State2, Timeout};

aborted(_Msg, _From, State) ->
#state{timeout = Timeout} = State,
{reply, {error, sync_required}, aborted, State, Timeout}.

%% -- internal functions --

%% decode data
Expand Down Expand Up @@ -659,3 +660,25 @@ hex(Bin) ->

send(#state{sock = Sock}, Type, Data) ->
pgsql_sock:send(Sock, Type, Data).

%% If autosyncing, tell callers we are done so they can
%% retry their request
maybe_reply(#state{autosync = true, callers = Callers} = State) ->
[Caller ! {ok, proceed} || Caller <- Callers],
State#state{callers = []};
%% If not autosyncing, send FSM reply as usual
maybe_reply(#state{reply = Reply, reply_to = Reply_To, autosync = false} = State) ->
gen_fsm:reply(Reply_To, Reply),
State.

%% send event and handle connection autosync
sync_send_event(C, Event, Timeout) ->
case gen_fsm:sync_send_event(C, Event, Timeout) of
{wait, synchronizing} ->
receive
{ok, proceed} ->
sync_send_event(C, Event, Timeout)
end;
R ->
R
end.
2 changes: 1 addition & 1 deletion test_src/pgsql_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ execute_error_test() ->
{ok, S} = pgsql:parse(C, "insert into test_table1 (id, value) values ($1, $2)"),
ok = pgsql:bind(C, S, [1, <<"foo">>]),
{error, #error{code = <<"23505">>}} = pgsql:execute(C, S, 0),
{error, sync_required} = pgsql:bind(C, S, [3, <<"quux">>]),
ok = pgsql:bind(C, S, [3, <<"quux">>]),
ok = pgsql:sync(C),
ok = pgsql:bind(C, S, [3, <<"quux">>]),
{ok, _} = pgsql:execute(C, S, 0),
Expand Down