diff --git a/lib/mnesia/src/mnesia.hrl b/lib/mnesia/src/mnesia.hrl index 3fcedbfd09d0..44c89dd3134e 100644 --- a/lib/mnesia/src/mnesia.hrl +++ b/lib/mnesia/src/mnesia.hrl @@ -53,6 +53,17 @@ (try ?ets_lookup_element(mnesia_gvar, Var, 2) catch error:_:_Stacktrace -> {'EXIT', _Stacktrace} end)). +-define(unalias_and_flush_msg(Alias, Msg), + unalias(Alias), + ?flush_msg(Msg) + ). + +-define(flush_msg(Msg), + receive Msg -> ok + after 0 -> ok + end + ). + %% It's important that counter is first, since we compare tid's -record(tid, diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl index 0a1e540a8a75..ac73a5ded267 100644 --- a/lib/mnesia/src/mnesia_controller.erl +++ b/lib/mnesia/src/mnesia_controller.erl @@ -99,7 +99,7 @@ dump_and_reply/2, load_and_reply/2, send_and_reply/2, - wait_for_tables_init/2, + wait_for_tables_init/3, connect_nodes2/3 ]). @@ -232,13 +232,16 @@ wait_for_tables(Tabs, Timeout) -> do_wait_for_tables(Tabs, 0) -> reply_wait(Tabs); do_wait_for_tables(Tabs, Timeout) -> - Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Tabs]), + Alias = alias([reply]), + Pid = spawn_link(?MODULE, wait_for_tables_init, [self(), Alias, Tabs]), receive - {?SERVER_NAME, Pid, Res} -> + {?SERVER_NAME, Alias, Res} -> Res; {'EXIT', Pid, _} -> + unalias(Alias), reply_wait(Tabs) after Timeout -> + ?unalias_and_flush_msg(Alias, {?SERVER_NAME, Alias, _}), unlink(Pid), exit(Pid, timeout), reply_wait(Tabs) @@ -256,10 +259,10 @@ reply_wait(Tabs) -> catch exit:_ -> {error, {node_not_running, node()}} end. -wait_for_tables_init(From, Tabs) -> +wait_for_tables_init(From, Alias, Tabs) -> process_flag(trap_exit, true), Res = wait_for_init(From, Tabs, whereis(?SERVER_NAME)), - From ! {?SERVER_NAME, self(), Res}, + Alias ! {?SERVER_NAME, Alias, Res}, unlink(From), exit(normal). @@ -1306,7 +1309,7 @@ handle_info(Msg = {'EXIT', Pid, R}, State) when R /= wait_for_tables_timeout -> end; handle_info({From, get_state}, State) -> - From ! {?SERVER_NAME, State}, + From ! {?SERVER_NAME, From, State}, noreply(State); %% No real need for buffering @@ -1860,12 +1863,14 @@ get_info(Timeout) -> undefined -> {timeout, Timeout}; Pid -> - Pid ! {self(), get_state}, + Alias = alias([reply]), + Pid ! {Alias, get_state}, receive - {?SERVER_NAME, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} -> + {?SERVER_NAME, Alias, State = #state{loader_queue=LQ,late_loader_queue=LLQ}} -> {info,State#state{loader_queue=gb_trees:to_list(LQ), late_loader_queue=gb_trees:to_list(LLQ)}} after Timeout -> + ?unalias_and_flush_msg(Alias, {?SERVER_NAME, Alias, _}), {timeout, Timeout} end end. @@ -1875,11 +1880,13 @@ get_workers(Timeout) -> undefined -> {timeout, Timeout}; Pid -> - Pid ! {self(), get_state}, + Alias = alias([reply]), + Pid ! {Alias, get_state}, receive - {?SERVER_NAME, State = #state{}} -> + {?SERVER_NAME, Alias, State = #state{}} -> {workers, get_loaders(State), get_senders(State), State#state.dumper_pid} after Timeout -> + ?unalias_and_flush_msg(Alias, {?SERVER_NAME, Alias, _}), {timeout, Timeout} end end. diff --git a/lib/mnesia/src/mnesia_locker.erl b/lib/mnesia/src/mnesia_locker.erl index aa7f94ee9f59..44516596b587 100644 --- a/lib/mnesia/src/mnesia_locker.erl +++ b/lib/mnesia/src/mnesia_locker.erl @@ -1126,8 +1126,15 @@ rec_requests([], _Oid, _Store) -> ok. get_held_locks() -> - ?MODULE ! {get_table, self(), mnesia_held_locks}, - Locks = receive {mnesia_held_locks, Ls} -> Ls after 5000 -> [] end, + Alias = alias([reply]), + ?MODULE ! {get_table, Alias, mnesia_held_locks}, + Locks = receive + {mnesia_held_locks, Ls} -> + Ls + after 5000 -> + ?unalias_and_flush_msg(Alias, {mnesia_held_locks, _}), + [] + end, rewrite_locks(Locks, []). %% Mnesia internal usage only @@ -1148,8 +1155,15 @@ rewrite_locks([], Acc) -> lists:reverse(Acc). get_lock_queue() -> - ?MODULE ! {get_table, self(), mnesia_lock_queue}, - Q = receive {mnesia_lock_queue, Locks} -> Locks after 5000 -> [] end, + Alias = alias([reply]), + ?MODULE ! {get_table, Alias, mnesia_lock_queue}, + Q = receive + {mnesia_lock_queue, Locks} -> + Locks + after 5000 -> + ?unalias_and_flush_msg(Alias, {mnesia_lock_queue, _}), + [] + end, [{Oid, Op, Pid, Tid, WFT} || {queue, Oid, Tid, Op, Pid, WFT} <- Q]. do_stop() -> diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl index 10def6d3d7a1..e6ec6ea09bb5 100644 --- a/lib/mnesia/src/mnesia_tm.erl +++ b/lib/mnesia/src/mnesia_tm.erl @@ -1108,8 +1108,12 @@ intercept_best_friend([{stop,Fun} | R],Ignore) -> ?CATCH(Fun()), intercept_best_friend(R,Ignore); intercept_best_friend([Pid | R],false) -> - Pid ! {activity_ended, undefined, self()}, + Alias = alias([reply]), + Pid ! {activity_ended, undefined, Alias}, wait_for_best_friend(Pid, 0), + unalias(Alias), + ?flush_msg({activity_ended, _, Pid}), + ?flush_msg({'EXIT', Pid, _}), intercept_best_friend(R,true); intercept_best_friend([_|R],true) -> intercept_best_friend(R,true). @@ -2049,8 +2053,9 @@ sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) -> Res = do_dirty(Tid, Head), {WF, Res}; true -> - {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, - sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor]) + Alias = alias([reply]), + {?MODULE, Node} ! {Alias, {sync_dirty, Tid, Head, Tab}}, + sync_send_dirty(Tid, Tail, Tab, [{Node, Alias} | WaitFor]) end; sync_send_dirty(_Tid, [], _Tab, WaitFor) -> {WaitFor, {'EXIT', {aborted, {node_not_running, WaitFor}}}}. @@ -2068,9 +2073,10 @@ async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) -> NewRes = do_dirty(Tid, Head), async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes); ReadNode == Node -> - {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, + Alias = alias([reply]), + {?MODULE, Node} ! {Alias, {sync_dirty, Tid, Head, Tab}}, NewRes = {'EXIT', {aborted, {node_not_running, Node}}}, - async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes); + async_send_dirty(Tid, Tail, Tab, ReadNode, [{Node, Alias} | WaitFor], NewRes); true -> {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}}, async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res) @@ -2078,8 +2084,16 @@ async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) -> async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) -> {WaitFor, Res}. -rec_dirty([Node | Tail], Res) when Node /= node() -> - NewRes = get_dirty_reply(Node, Res), +rec_dirty([{Node, Alias} | Tail], Res) when Node /= node() -> + NewRes = + try + get_dirty_reply(Node, Res) + after + unalias(Alias), + ?flush_msg({?MODULE, Node, {'EXIT', _}}), + ?flush_msg({?MODULE, Node, {dirty_res, _}}), + ?flush_msg({mnesia_down, Node}) + end, rec_dirty(Tail, NewRes); rec_dirty([], Res) -> Res. @@ -2203,11 +2217,13 @@ get_info(Timeout) -> undefined -> {timeout, Timeout}; Pid -> - Pid ! {self(), info}, + Alias = alias([reply]), + Pid ! {Alias, info}, receive {?MODULE, _, {info, Part, Coord}} -> {info, Part, Coord} after Timeout -> + ?unalias_and_flush_msg(Alias, {?MODULE, _, {info, _, _}}), {timeout, Timeout} end end.