diff --git a/pgapp.config.sample b/pgapp.config.sample index bfdf587..f21e192 100644 --- a/pgapp.config.sample +++ b/pgapp.config.sample @@ -19,7 +19,19 @@ {database, "db2"}, {username, "user"}, {password, "pass"} - ]} + ]}, + {pool3, [ + {size, 10}, + {max_overflow, 20} + ], + [ + {host, "localhost"}, + {database, "db3"}, + {username, "user"}, + {password, "pass"} + ], + "priv/prepared.sql"}, + ]} ] }]. diff --git a/priv/empty.sql b/priv/empty.sql new file mode 100644 index 0000000..e69de29 diff --git a/priv/prepared.sql b/priv/prepared.sql new file mode 100644 index 0000000..0907f81 --- /dev/null +++ b/priv/prepared.sql @@ -0,0 +1,2 @@ +{"simple", "select 1+1"}. +{"increment", "select $1+1"}. \ No newline at end of file diff --git a/src/pgapp.erl b/src/pgapp.erl index 949eb1c..d0bd281 100644 --- a/src/pgapp.erl +++ b/src/pgapp.erl @@ -9,9 +9,11 @@ -module(pgapp). %% API + -export([connect/1, connect/2, equery/2, equery/3, equery/4, squery/1, squery/2, squery/3, + prepared_query/2, prepared_query/3, prepared_query/4, with_transaction/1, with_transaction/2, with_transaction/3]). %%%=================================================================== @@ -54,9 +56,42 @@ equery(P1, P2, P3) -> equery(PoolName, Sql, Params, Timeout) -> pgapp_worker:equery(PoolName, Sql, Params, Timeout). + + + + +-spec prepared_query(Name :: string(), + Params :: list(epgsql:bind_param())) + -> epgsql:reply(epgsql:equery_row()). +prepared_query(Name, Params) -> + pgapp_worker:prepared_query(Name, Params). + +-spec prepared_query(Name :: string(), + Params :: list(epgsql:bind_param()), + Timeout :: atom() | integer()) + -> epgsql:reply(epgsql:equery_row()); + (PoolName :: atom(), + Name :: string(), + Params :: list(epgsql:bind_param())) + -> epgsql:reply(epgsql:equery_row()). +prepared_query(P1, P2, P3) -> + pgapp_worker:prepared_query(P1, P2, P3). + +-spec prepared_query(PoolName :: atom(), + Name :: string(), + Params :: list(epgsql:bind_param()), + Timeout :: atom() | integer()) + -> epgsql:reply(epgsql:equery_row()). +prepared_query(PoolName, Sql, Params, Timeout) -> + pgapp_worker:prepared_query(PoolName, Sql, Params, Timeout). + + + + -spec squery(Sql :: epgsql:sql_query()) - -> epgsql:reply(epgsql:squery_row()) | - [epgsql:reply(epgsql:squery_row())]. + -> epgsql:reply(epgsql:squery_row()) | + [epgsql:reply(epgsql:squery_row())]. + squery(Sql) -> pgapp_worker:squery(Sql). diff --git a/src/pgapp_sup.erl b/src/pgapp_sup.erl index b135376..8d399e7 100644 --- a/src/pgapp_sup.erl +++ b/src/pgapp_sup.erl @@ -2,6 +2,8 @@ -behaviour(supervisor). +-include("worker_args.hrl"). + %% API -export([start_link/0, add_pool/3]). @@ -24,13 +26,30 @@ start_link() -> init([]) -> {ok, Pools} = application:get_env(pgapp, pools), - PoolSpec = lists:map(fun ({PoolName, SizeArgs, WorkerArgs}) -> - PoolArgs = [{name, {local, PoolName}}, - {worker_module, pgapp_worker}] ++ SizeArgs, - poolboy:child_spec(PoolName, PoolArgs, WorkerArgs) + PoolSpec = lists:map(fun ({PoolName, SizeArgs, ConnectionArgs}) -> + make_child_spec(PoolName, SizeArgs, ConnectionArgs, undefined); + ({PoolName, SizeArgs, ConnectionArgs, SQLFile}) -> + make_child_spec(PoolName, SizeArgs, ConnectionArgs, SQLFile) end, Pools), {ok, { {one_for_one, 10, 10}, PoolSpec} }. -add_pool(Name, PoolArgs, WorkerArgs) -> - ChildSpec = poolboy:child_spec(Name, PoolArgs, WorkerArgs), - supervisor:start_child(?MODULE, ChildSpec). +make_child_spec(PoolName, SizeArgs, ConnectionArgs, SQLFile) -> + PoolArgs = [{name, {local, PoolName}}, + {worker_module, pgapp_worker}] ++ SizeArgs, + poolboy:child_spec(PoolName, PoolArgs, make_worker_args(ConnectionArgs, SQLFile)). + +make_worker_args(ConnectionArgs, SQLFileName) -> + SQL = case SQLFileName of + undefined -> []; + File -> + {ok, PreparedSQL} = file:consult(File), + PreparedSQL + end, + #worker_args{connection_args = ConnectionArgs, prepared_sql = SQL}. + +add_pool(Name, PoolArgs, ConnectionArgs) -> + add_pool(Name, PoolArgs, ConnectionArgs, undefined). + +add_pool(Name, PoolArgs, ConnectionArgs, SQLFile) -> + ChildSpec = poolboy:child_spec(Name, PoolArgs, make_worker_args(ConnectionArgs, SQLFile)), + supervisor:start_child(?MODULE, ChildSpec). diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index cc1e8e1..82397f6 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -10,6 +10,7 @@ -export([squery/1, squery/2, squery/3, equery/2, equery/3, equery/4, + prepared_query/2, prepared_query/3, prepared_query/4, with_transaction/2, with_transaction/3]). -export([start_link/1]). @@ -20,7 +21,8 @@ -record(state, {conn::pid(), delay::pos_integer(), timer::timer:tref(), - start_args::proplists:proplist()}). + start_args::proplists:proplist(), + sql_text::proplists:proplist()}). -define(INITIAL_DELAY, 500). % Half a second -define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes @@ -67,6 +69,26 @@ equery(PoolName, Sql, Params, Timeout) -> {equery, Sql, Params}, Timeout) end, Timeout). +prepared_query(Name, Params) -> + case get(?STATE_VAR) of + undefined -> + prepared_query(epgsql_pool, Name, Params); + Conn -> + epgsql:prepared_query(Conn, Name, Params) + end. + +prepared_query(PoolName, Name, Params) when is_atom(PoolName) -> + prepared_query(PoolName, Name, Params, ?TIMEOUT); +prepared_query(Name, Params, Timeout) -> + prepared_query(epgsql_pool, Name, Params, Timeout). + +prepared_query(PoolName, Name, Params, Timeout) -> + poolboy:transaction(PoolName, + fun (Worker) -> + gen_server:call(Worker, + {prepared_query, Name, Params}, Timeout) + end, Timeout). + with_transaction(PoolName, Fun) -> with_transaction(PoolName, Fun, ?TIMEOUT). @@ -77,16 +99,29 @@ with_transaction(PoolName, Fun, Timeout) -> {transaction, Fun}, Timeout) end, Timeout). + +-include_lib("epgsql/include/epgsql.hrl"). +-include("worker_args.hrl"). + start_link(Args) -> gen_server:start_link(?MODULE, Args, []). -init(Args) -> +init(#worker_args{connection_args = Args, prepared_sql = PreparedSQL}) -> process_flag(trap_exit, true), - {ok, connect(#state{start_args = Args, delay = ?INITIAL_DELAY})}. + {ok, connect(#state{ + start_args = Args, + sql_text = PreparedSQL, + delay = ?INITIAL_DELAY + })}. handle_call({squery, Sql}, _From, #state{conn=Conn} = State) when Conn /= undefined -> {reply, epgsql:squery(Conn, Sql), State}; + +handle_call({prepared_query, Name, Params}, _From, + #state{conn = Conn} = State) when Conn /= undefined -> + {reply, epgsql:prepared_query(Conn, Name, Params), State}; + handle_call({equery, Sql, Params}, _From, #state{conn = Conn} = State) when Conn /= undefined -> {reply, epgsql:equery(Conn, Sql, Params), State}; @@ -129,6 +164,21 @@ terminate(_Reason, #state{conn=Conn}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +prepare_statements(Con, PreparedSQL) -> + lists:all( + fun({Name, Query}) -> + case epgsql:parse(Con, Name, Query, []) of + {ok, _Statement} -> + true; + {error, Reason} -> + error_logger:error_msg("Error ~p parsing SQL query ~p", [Reason, Query]), + false + end + end, + PreparedSQL + ). + connect(State) -> Args = State#state.start_args, Hostname = proplists:get_value(host, Args), @@ -141,19 +191,34 @@ connect(State) -> "~p Connected to ~s at ~s with user ~s: ~p~n", [self(), Database, Hostname, Username, Conn]), timer:cancel(State#state.timer), - State#state{conn=Conn, delay=?INITIAL_DELAY, timer = undefined}; + case prepare_statements(Conn, State#state.sql_text) of + true -> + State#state{conn=Conn, delay=?INITIAL_DELAY, timer = undefined}; + false -> + ok = epgsql:close(Conn), + NewState = handle_connection_error(State), + error_logger:error_msg( + "~p Unable to prepare statements on ~s at ~s with user ~s " + "- attempting reconnect in ~p ms~n", + [self(), Database, Hostname, Username, NewState#state.delay]), + NewState + end; Error -> - NewDelay = calculate_delay(State#state.delay), + NewState = handle_connection_error(State), error_logger:warning_msg( "~p Unable to connect to ~s at ~s with user ~s (~p) " "- attempting reconnect in ~p ms~n", - [self(), Database, Hostname, Username, Error, NewDelay]), - {ok, Tref} = - timer:apply_after( - State#state.delay, gen_server, cast, [self(), reconnect]), - State#state{conn=undefined, delay = NewDelay, timer = Tref} + [self(), Database, Hostname, Username, Error, NewState#state.delay]), + NewState end. +handle_connection_error(#state{delay = Delay} = State) -> + NewDelay = calculate_delay(Delay), + {ok, Tref} = + timer:apply_after( + Delay, gen_server, cast, [self(), reconnect]), + State#state{conn=undefined, delay = NewDelay, timer = Tref}. + calculate_delay(Delay) when (Delay * 2) >= ?MAXIMUM_DELAY -> ?MAXIMUM_DELAY; calculate_delay(Delay) -> diff --git a/src/worker_args.hrl b/src/worker_args.hrl new file mode 100644 index 0000000..40f81a7 --- /dev/null +++ b/src/worker_args.hrl @@ -0,0 +1,6 @@ +-record( + worker_args, + { + prepared_sql, + connection_args + }). \ No newline at end of file