From faf58766745ba9b11b904c8363673b10c77c2717 Mon Sep 17 00:00:00 2001 From: lukyanov Date: Thu, 24 Aug 2017 16:53:46 +0000 Subject: [PATCH 1/4] Make `with_transaction` work with explicitly specifed pool name. --- src/pgapp_worker.erl | 49 ++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index 5684498..106a233 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -26,15 +26,10 @@ -define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes -define(TIMEOUT, 5 * 1000). --define(STATE_VAR, '$pgapp_state'). +-define(TX_CONNECTION_VAR(PoolName), {'$pgapp_tx_connection', PoolName}). squery(Sql) -> - case get(?STATE_VAR) of - undefined -> - squery(epgsql_pool, Sql); - Conn -> - epgsql:squery(Conn, Sql) - end. + squery(epgsql_pool, Sql). squery(PoolName, Sql) when is_atom(PoolName) -> squery(PoolName, Sql, ?TIMEOUT); @@ -42,30 +37,34 @@ squery(Sql, Timeout) -> squery(epgsql_pool, Sql, Timeout). squery(PoolName, Sql, Timeout) -> - middle_man_transaction(PoolName, - fun (W) -> - gen_server:call(W, {squery, Sql}, Timeout) - end, Timeout). - -equery(Sql, Params) -> - case get(?STATE_VAR) of + case get(?TX_CONNECTION_VAR(PoolName)) of undefined -> - equery(epgsql_pool, Sql, Params); + middle_man_transaction(PoolName, + fun (W) -> + gen_server:call(W, {squery, Sql}, Timeout) + end, Timeout); Conn -> - epgsql:equery(Conn, Sql, Params) + epgsql:squery(Conn, Sql) end. +equery(Sql, Params) -> + equery(epgsql_pool, Sql, Params). + equery(PoolName, Sql, Params) when is_atom(PoolName) -> equery(PoolName, Sql, Params, ?TIMEOUT); equery(Sql, Params, Timeout) -> equery(epgsql_pool, Sql, Params, Timeout). equery(PoolName, Sql, Params, Timeout) -> - middle_man_transaction(PoolName, - fun (W) -> - gen_server:call(W, {equery, Sql, Params}, - Timeout) - end, Timeout). + case get(?TX_CONNECTION_VAR(PoolName)) of + undefined -> + middle_man_transaction(PoolName, + fun (W) -> + gen_server:call(W, {equery, Sql, Params}, Timeout) + end, Timeout); + Conn -> + epgsql:equery(Conn, Sql, Params) + end. with_transaction(PoolName, Fun) -> with_transaction(PoolName, Fun, ?TIMEOUT). @@ -73,7 +72,7 @@ with_transaction(PoolName, Fun) -> with_transaction(PoolName, Fun, Timeout) -> middle_man_transaction(PoolName, fun (W) -> - gen_server:call(W, {transaction, Fun}, + gen_server:call(W, {transaction, PoolName, Fun}, Timeout) end, Timeout). @@ -110,11 +109,11 @@ handle_call({squery, Sql}, _From, handle_call({equery, Sql, Params}, _From, #state{conn = Conn} = State) -> {reply, epgsql:equery(Conn, Sql, Params), State}; -handle_call({transaction, Fun}, _From, +handle_call({transaction, PoolName, Fun}, _From, #state{conn = Conn} = State) -> - put(?STATE_VAR, Conn), + put(?TX_CONNECTION_VAR(PoolName), Conn), Result = epgsql:with_transaction(Conn, fun(_) -> Fun() end), - erase(?STATE_VAR), + erase(?TX_CONNECTION_VAR(PoolName)), {reply, Result, State}. handle_cast(reconnect, State) -> From a59a9b5acbd6239615bba9f3711316bdb706b48f Mon Sep 17 00:00:00 2001 From: lukyanov Date: Fri, 25 Aug 2017 07:39:41 +0000 Subject: [PATCH 2/4] Revert "Make `with_transaction` work with explicitly specifed pool name." This reverts commit faf58766745ba9b11b904c8363673b10c77c2717. --- src/pgapp_worker.erl | 49 ++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index 106a233..5684498 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -26,10 +26,15 @@ -define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes -define(TIMEOUT, 5 * 1000). --define(TX_CONNECTION_VAR(PoolName), {'$pgapp_tx_connection', PoolName}). +-define(STATE_VAR, '$pgapp_state'). squery(Sql) -> - squery(epgsql_pool, Sql). + case get(?STATE_VAR) of + undefined -> + squery(epgsql_pool, Sql); + Conn -> + epgsql:squery(Conn, Sql) + end. squery(PoolName, Sql) when is_atom(PoolName) -> squery(PoolName, Sql, ?TIMEOUT); @@ -37,34 +42,30 @@ squery(Sql, Timeout) -> squery(epgsql_pool, Sql, Timeout). squery(PoolName, Sql, Timeout) -> - case get(?TX_CONNECTION_VAR(PoolName)) of + middle_man_transaction(PoolName, + fun (W) -> + gen_server:call(W, {squery, Sql}, Timeout) + end, Timeout). + +equery(Sql, Params) -> + case get(?STATE_VAR) of undefined -> - middle_man_transaction(PoolName, - fun (W) -> - gen_server:call(W, {squery, Sql}, Timeout) - end, Timeout); + equery(epgsql_pool, Sql, Params); Conn -> - epgsql:squery(Conn, Sql) + epgsql:equery(Conn, Sql, Params) end. -equery(Sql, Params) -> - equery(epgsql_pool, Sql, Params). - equery(PoolName, Sql, Params) when is_atom(PoolName) -> equery(PoolName, Sql, Params, ?TIMEOUT); equery(Sql, Params, Timeout) -> equery(epgsql_pool, Sql, Params, Timeout). equery(PoolName, Sql, Params, Timeout) -> - case get(?TX_CONNECTION_VAR(PoolName)) of - undefined -> - middle_man_transaction(PoolName, - fun (W) -> - gen_server:call(W, {equery, Sql, Params}, Timeout) - end, Timeout); - Conn -> - epgsql:equery(Conn, Sql, Params) - end. + middle_man_transaction(PoolName, + fun (W) -> + gen_server:call(W, {equery, Sql, Params}, + Timeout) + end, Timeout). with_transaction(PoolName, Fun) -> with_transaction(PoolName, Fun, ?TIMEOUT). @@ -72,7 +73,7 @@ with_transaction(PoolName, Fun) -> with_transaction(PoolName, Fun, Timeout) -> middle_man_transaction(PoolName, fun (W) -> - gen_server:call(W, {transaction, PoolName, Fun}, + gen_server:call(W, {transaction, Fun}, Timeout) end, Timeout). @@ -109,11 +110,11 @@ handle_call({squery, Sql}, _From, handle_call({equery, Sql, Params}, _From, #state{conn = Conn} = State) -> {reply, epgsql:equery(Conn, Sql, Params), State}; -handle_call({transaction, PoolName, Fun}, _From, +handle_call({transaction, Fun}, _From, #state{conn = Conn} = State) -> - put(?TX_CONNECTION_VAR(PoolName), Conn), + put(?STATE_VAR, Conn), Result = epgsql:with_transaction(Conn, fun(_) -> Fun() end), - erase(?TX_CONNECTION_VAR(PoolName)), + erase(?STATE_VAR), {reply, Result, State}. handle_cast(reconnect, State) -> From 12af53b351809eb349d35f6fdd06a9b7fef6df15 Mon Sep 17 00:00:00 2001 From: lukyanov Date: Fri, 25 Aug 2017 13:20:38 +0000 Subject: [PATCH 3/4] `with_transaction` now accepts Fun/1 The callback in `with_transaction` is now a function with arity 1. The argument of the function is the connection to be used inside the function when running queries. --- src/pgapp.erl | 42 +++++++++++++++++++++++++++--------------- src/pgapp_worker.erl | 30 +++++++++++------------------- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/src/pgapp.erl b/src/pgapp.erl index 04234e0..265226a 100644 --- a/src/pgapp.erl +++ b/src/pgapp.erl @@ -42,6 +42,10 @@ equery(Sql, Params) -> (PoolName :: atom(), Sql::epgsql:sql_query(), Params :: list(epgsql:bind_param())) + -> epgsql:reply(epgsql:equery_row()) | {error, Reason :: any()}; + (Conn :: pid(), + Sql::epgsql:sql_query(), + Params :: list(epgsql:bind_param())) -> epgsql:reply(epgsql:equery_row()) | {error, Reason :: any()}. equery(P1, P2, P3) -> pgapp_worker:equery(P1, P2, P3). @@ -66,12 +70,14 @@ squery(Sql) -> [epgsql:reply(epgsql:squery_row())] | {error, Reason :: any()}; (PoolName :: atom(), Sql::epgsql:sql_query()) + -> epgsql:reply(epgsql:squery_row()) | + [epgsql:reply(epgsql:squery_row())] | {error, Reason :: any()}; + (Conn :: pid(), + Sql::epgsql:sql_query()) -> epgsql:reply(epgsql:squery_row()) | [epgsql:reply(epgsql:squery_row())] | {error, Reason :: any()}. -squery(PoolName, Sql) when is_atom(PoolName) -> - pgapp_worker:squery(PoolName, Sql); -squery(Sql, Timeout) -> - pgapp_worker:squery(Sql, Timeout). +squery(P1, P2) -> + pgapp_worker:squery(P1, P2). -spec squery(PoolName :: atom(), Sql :: epgsql:sql_query(), @@ -81,29 +87,35 @@ squery(Sql, Timeout) -> squery(PoolName, Sql, Timeout) -> pgapp_worker:squery(PoolName, Sql, Timeout). --spec with_transaction(Function :: fun(() -> Reply)) +-spec with_transaction(Function :: fun((pid()) -> Reply)) -> Reply | {rollback | error, any()} when Reply :: any(). -with_transaction(Fun) when is_function(Fun, 0) -> +with_transaction(Fun) when is_function(Fun, 1) -> with_transaction(epgsql_pool, Fun). -spec with_transaction(PoolName :: atom(), - Function :: fun(() -> Reply)) + Function :: fun((pid()) -> Reply)) + -> Reply | {rollback | error, any()} when Reply :: any(); + (Conn :: pid(), + Function :: fun((pid()) -> Reply)) -> Reply | {rollback | error, any()} when Reply :: any(); - (Function :: fun(() -> Reply), + (Function :: fun((pid()) -> Reply), Timeout :: timeout()) -> Reply | {rollback | error, any()} when Reply :: any(). -with_transaction(PoolName, Fun) when is_function(Fun, 0); - is_atom(PoolName) -> - pgapp_worker:with_transaction(PoolName, Fun); -with_transaction(Fun, Timeout) when is_function(Fun, 0) -> +with_transaction(P1, Fun) when is_function(Fun, 1) -> + pgapp_worker:with_transaction(P1, Fun); +with_transaction(Fun, Timeout) when is_function(Fun, 1) -> pgapp_worker:with_transaction(epgsql_pool, Fun, Timeout). -spec with_transaction(PoolName :: atom(), - Function :: fun(() -> Reply), + Function :: fun((pid()) -> Reply), + Timeout :: atom() | non_neg_integer()) + -> Reply | {rollback | error, any()} when Reply :: any(); + (Conn :: pid(), + Function :: fun((pid()) -> Reply), Timeout :: atom() | non_neg_integer()) -> Reply | {rollback | error, any()} when Reply :: any(). -with_transaction(PoolName, Fun, Timeout) when is_function(Fun, 0) -> - pgapp_worker:with_transaction(PoolName, Fun, Timeout). +with_transaction(P1, Fun, Timeout) when is_function(Fun, 1) -> + pgapp_worker:with_transaction(P1, Fun, Timeout). %%-------------------------------------------------------------------- %% @doc diff --git a/src/pgapp_worker.erl b/src/pgapp_worker.erl index 5684498..3b2524f 100644 --- a/src/pgapp_worker.erl +++ b/src/pgapp_worker.erl @@ -26,18 +26,13 @@ -define(MAXIMUM_DELAY, 5 * 60 * 1000). % Five minutes -define(TIMEOUT, 5 * 1000). --define(STATE_VAR, '$pgapp_state'). - squery(Sql) -> - case get(?STATE_VAR) of - undefined -> - squery(epgsql_pool, Sql); - Conn -> - epgsql:squery(Conn, Sql) - end. + squery(epgsql_pool, Sql). squery(PoolName, Sql) when is_atom(PoolName) -> squery(PoolName, Sql, ?TIMEOUT); +squery(Conn, Sql) when is_pid(Conn) -> + epgsql:squery(Conn, Sql); squery(Sql, Timeout) -> squery(epgsql_pool, Sql, Timeout). @@ -48,15 +43,12 @@ squery(PoolName, Sql, Timeout) -> end, Timeout). equery(Sql, Params) -> - case get(?STATE_VAR) of - undefined -> - equery(epgsql_pool, Sql, Params); - Conn -> - epgsql:equery(Conn, Sql, Params) - end. + equery(epgsql_pool, Sql, Params). equery(PoolName, Sql, Params) when is_atom(PoolName) -> equery(PoolName, Sql, Params, ?TIMEOUT); +equery(Conn, Sql, Params) when is_pid(Conn) -> + epgsql:equery(Conn, Sql, Params); equery(Sql, Params, Timeout) -> equery(epgsql_pool, Sql, Params, Timeout). @@ -70,12 +62,14 @@ equery(PoolName, Sql, Params, Timeout) -> with_transaction(PoolName, Fun) -> with_transaction(PoolName, Fun, ?TIMEOUT). -with_transaction(PoolName, Fun, Timeout) -> +with_transaction(PoolName, Fun, Timeout) when is_atom(PoolName) -> middle_man_transaction(PoolName, fun (W) -> gen_server:call(W, {transaction, Fun}, Timeout) - end, Timeout). + end, Timeout); +with_transaction(Conn, Fun, _Timeout) when is_pid(Conn) -> + epgsql:with_transaction(Conn, Fun). middle_man_transaction(Pool, Fun, Timeout) -> Tag = make_ref(), @@ -112,9 +106,7 @@ handle_call({equery, Sql, Params}, _From, {reply, epgsql:equery(Conn, Sql, Params), State}; handle_call({transaction, Fun}, _From, #state{conn = Conn} = State) -> - put(?STATE_VAR, Conn), - Result = epgsql:with_transaction(Conn, fun(_) -> Fun() end), - erase(?STATE_VAR), + Result = epgsql:with_transaction(Conn, Fun), {reply, Result, State}. handle_cast(reconnect, State) -> From 98bf9195f2da0346ff5bbb9f63ac39142fafb70b Mon Sep 17 00:00:00 2001 From: lukyanov Date: Fri, 25 Aug 2017 13:29:20 +0000 Subject: [PATCH 4/4] Fix README.md --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 756fe4a..51f694c 100644 --- a/README.md +++ b/README.md @@ -27,10 +27,10 @@ API use: application:ensure_all_started(pgapp). pgapp:connect([{size, 10}, {database, "mydb"}, {username, "foo"}, {password, "bar"}]). pgapp:equery("select current_date", []), - pgapp:with_transaction(fun() -> - pgapp:squery("update ..."), - pgapp:squery("delete from ..."), - pgapp:equery("select ? from ?", ["*", Table]) + pgapp:with_transaction(fun(Conn) -> + pgapp:squery(Conn, "update ..."), + pgapp:squery(Conn, "delete from ..."), + pgapp:equery(Conn, "select ? from ?", ["*", Table]) end). - Multi pool: