Skip to content

Commit 57cee85

Browse files
Merge pull request #9590 from rickard-green/rickard/prio-msg-fix/OTP-19198
Priority message fixes
2 parents 96fa8a9 + 422e145 commit 57cee85

File tree

3 files changed

+115
-34
lines changed

3 files changed

+115
-34
lines changed

erts/emulator/beam/erl_proc_sig_queue.c

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -303,11 +303,11 @@ static void proc_sig_queue_unlock_buffer(ErtsSignalInQueueBuffer* slot);
303303
static void handle_message_enqueued_tracing(Process *c_p,
304304
ErtsSigRecvTracing *tracing,
305305
ErtsMessage *msg);
306-
static void
306+
static int
307307
insert_prepared_prio_msg(Process *c_p, ErtsSigRecvTracing *tracing,
308308
ErtsMessage *sig, Eterm message, Eterm token,
309309
ErtsMessage ***next_nm_sig);
310-
static void
310+
static int
311311
insert_prepared_prio_msg_attached(Process *c_p, ErtsSigRecvTracing *tracing,
312312
ErtsMessage *sig, void *attached,
313313
Eterm message, Eterm token,
@@ -4703,9 +4703,9 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
47034703
: ((ErtsSeqTokenExitSignalData *) xsigd)->token);
47044704
if (prio) {
47054705
remove_nm_sig(c_p, sig, next_nm_sig);
4706-
insert_prepared_prio_msg(c_p, tracing, sig,
4707-
xsigd->message, token,
4708-
next_nm_sig);
4706+
cnt += insert_prepared_prio_msg(c_p, tracing, sig,
4707+
xsigd->message, token,
4708+
next_nm_sig);
47094709
}
47104710
else {
47114711
convert_prepared_sig_to_msg(c_p, tracing, sig,
@@ -4778,17 +4778,19 @@ convert_prepared_down_message(Process *c_p, ErtsSigRecvTracing *tracing,
47784778
int prio, ErtsMessage *sig, Eterm msg,
47794779
ErtsMessage ***next_nm_sig)
47804780
{
4781+
int cnt = 0;
47814782
if (prio) {
47824783
remove_nm_sig(c_p, sig, next_nm_sig);
4783-
insert_prepared_prio_msg(c_p, tracing, sig, msg, am_undefined,
4784-
next_nm_sig);
4784+
cnt += insert_prepared_prio_msg(c_p, tracing, sig, msg, am_undefined,
4785+
next_nm_sig);
47854786
}
47864787
else {
47874788
convert_prepared_sig_to_msg(c_p, tracing, sig, msg, am_undefined,
47884789
next_nm_sig);
4790+
cnt++;
47894791
}
47904792
erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);
4791-
return 1;
4793+
return cnt;
47924794
}
47934795

47944796
static int
@@ -4998,11 +5000,10 @@ convert_to_down_message(Process *c_p,
49985000
*priop = !!((*omon)->flags & ERTS_ML_FLG_PRIO_ML);
49995001
if (*priop) {
50005002
remove_nm_sig(c_p, sig, next_nm_sig);
5001-
insert_prepared_prio_msg_attached(c_p, tracing, mp,
5002-
mp->data.attached, message,
5003-
am_undefined,
5004-
next_nm_sig);
5005-
cnt += 4;
5003+
cnt += insert_prepared_prio_msg_attached(c_p, tracing, mp,
5004+
mp->data.attached, message,
5005+
am_undefined,
5006+
next_nm_sig);
50065007
goto notify_new_message;
50075008
}
50085009

@@ -6061,9 +6062,9 @@ handle_altact_msg(Process *c_p, ErtsSigRecvTracing *tracing,
60616062
data_attached, next_nm_sig);
60626063
}
60636064
else {
6064-
insert_prepared_prio_msg_attached(c_p, tracing, sig,
6065-
data_attached, msg, token,
6066-
next_nm_sig);
6065+
cnt += insert_prepared_prio_msg_attached(c_p, tracing, sig,
6066+
data_attached, msg, token,
6067+
next_nm_sig);
60676068
}
60686069
cnt++;
60696070
break;
@@ -6128,8 +6129,8 @@ handle_altact_msg(Process *c_p, ErtsSigRecvTracing *tracing,
61286129
/* drop faulty encoded external message... */
61296130
return cnt;
61306131
}
6131-
insert_prepared_prio_msg(c_p, tracing, mp, ERL_MESSAGE_TERM(mp),
6132-
token, next_nm_sig);
6132+
cnt += insert_prepared_prio_msg(c_p, tracing, mp, ERL_MESSAGE_TERM(mp),
6133+
token, next_nm_sig);
61336134
}
61346135
break;
61356136
}
@@ -9171,13 +9172,13 @@ prio_queue_check_recv_marks(Eterm obj, Eterm *ref,
91719172
}
91729173
}
91739174

9174-
static void
9175+
static int
91759176
insert_prepared_prio_msg_attached(Process *c_p, ErtsSigRecvTracing *tracing,
91769177
ErtsMessage *sig, void *data_attached,
91779178
Eterm message, Eterm token,
91789179
ErtsMessage ***next_nm_sig)
91799180
{
9180-
int i, empty_prio_q = !(c_p->sig_qs.flags & FS_PRIO_MQ_END_MARK);
9181+
int i, cnt = 1, empty_prio_q = !(c_p->sig_qs.flags & FS_PRIO_MQ_END_MARK);
91819182
ErtsRecvMarkerBlock *blk = c_p->sig_qs.recv_mrk_blk;
91829183
ErtsPrioQInfo *pq_info = get_prio_queue_info(c_p);
91839184
ErtsRecvMarker *pq_end = &pq_info->marker[ERTS_PRIO_Q_MARK_END];
@@ -9225,6 +9226,7 @@ insert_prepared_prio_msg_attached(Process *c_p, ErtsSigRecvTracing *tracing,
92259226

92269227
/* Note that this ref has been seen in the prio queue... */
92279228
mark->in_prioq = !0;
9229+
cnt += 4;
92289230
}
92299231
}
92309232
}
@@ -9333,33 +9335,46 @@ insert_prepared_prio_msg_attached(Process *c_p, ErtsSigRecvTracing *tracing,
93339335
}
93349336
}
93359337

9336-
/* Append the actual prio message to the prio queue... */
93379338
sig->data.attached = data_attached;
93389339

93399340
ERL_MESSAGE_TERM(sig) = message;
93409341
ERL_MESSAGE_TOKEN(sig) = token;
93419342

9343+
if (tracing->messages.active) {
9344+
if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) {
9345+
cnt += 50; /* Decode is expensive... */
9346+
if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN,
9347+
sig, 0)) {
9348+
/* Bad dist message; drop it... */
9349+
sig->next = NULL;
9350+
erts_cleanup_messages(sig);
9351+
return cnt;
9352+
}
9353+
}
9354+
handle_message_enqueued_tracing(c_p, tracing, sig);
9355+
}
9356+
9357+
/* Append the actual prio message to the prio queue... */
9358+
93429359
*pq_end->prev_next = sig;
93439360
pq_end->prev_next = &sig->next;
93449361
sig->next = (ErtsMessage *) pq_end;
93459362

93469363
c_p->sig_qs.mq_len++;
93479364
erts_chk_sys_mon_long_msgq_on(c_p);
93489365

9349-
if (tracing->messages.active)
9350-
handle_message_enqueued_tracing(c_p, tracing, sig);
9351-
9366+
return cnt;
93529367
}
93539368

9354-
static void
9369+
static int
93559370
insert_prepared_prio_msg(Process *c_p, ErtsSigRecvTracing *tracing,
93569371
ErtsMessage *sig, Eterm message, Eterm token,
93579372
ErtsMessage ***next_nm_sig)
93589373
{
9359-
insert_prepared_prio_msg_attached(c_p, tracing, sig,
9360-
ERTS_MSG_COMBINED_HFRAG,
9361-
message, token,
9362-
next_nm_sig);
9374+
return insert_prepared_prio_msg_attached(c_p, tracing, sig,
9375+
ERTS_MSG_COMBINED_HFRAG,
9376+
message, token,
9377+
next_nm_sig);
93639378
}
93649379

93659380
static void

erts/emulator/test/signal_SUITE.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2169,23 +2169,25 @@ priority_messages_old_nodes_test(Config, Node) ->
21692169
exit(Alias, prio_exit_1, [priority]),
21702170
wait_until(fun () -> lists:member(Node, nodes()) end),
21712171
exit(Alias, prio_exit_2, [priority]),
2172+
Pid ! msg_2,
2173+
wait_until(fun () -> msg_received(Pid, msg_2) end),
21722174

21732175
node_disconnect(Node),
21742176
exit(Pid, prio_exit_3, [priority]),
21752177
wait_until(fun () -> msg_received(Pid, {'EXIT', Self, prio_exit_3}) end),
21762178
exit(Pid, prio_exit_4, [priority]),
21772179
wait_until(fun () -> msg_received(Pid, {'EXIT', Self, prio_exit_4}) end),
2178-
case PrioMsgSupport of
2180+
case PrioMsgSupport of
21792181
true ->
21802182
{messages, [prio_msg_1, prio_msg_2, {'EXIT', Self, prio_exit_1},
21812183
{'EXIT', Self, prio_exit_2}, msg_1, prio_msg_3,
2182-
prio_msg_4, prio_msg_5, prio_msg_6, {'EXIT', Self, prio_exit_3},
2183-
{'EXIT', Self, prio_exit_4}]}
2184+
prio_msg_4, prio_msg_5, prio_msg_6, msg_2,
2185+
{'EXIT', Self, prio_exit_3}, {'EXIT', Self, prio_exit_4}]}
21842186
= proc_info(Pid, messages);
21852187
false ->
21862188
{messages, [msg_1, prio_msg_1, prio_msg_2, prio_msg_3, prio_msg_4,
2187-
prio_msg_5, prio_msg_6, {'EXIT', Self, prio_exit_3},
2188-
{'EXIT', Self, prio_exit_4}]}
2189+
prio_msg_5, prio_msg_6, msg_2,
2190+
{'EXIT', Self, prio_exit_3}, {'EXIT', Self, prio_exit_4}]}
21892191
= proc_info(Pid, messages)
21902192
end,
21912193

erts/emulator/test/trace_SUITE.erl

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
init_per_testcase/2, end_per_testcase/2,
3131
link_receive_call_correlation/0,
3232
receive_trace/1, receive_trace_non_fetching_receiver/1,
33+
receive_trace_priority_messages/1,
3334
link_receive_call_correlation/1, self_send/1,
3435
timeout_trace/1, send_trace/1,
3536
procs_trace/1, dist_procs_trace/1, procs_new_trace/1,
@@ -69,6 +70,7 @@ groups() ->
6970

7071
testcases() ->
7172
[cpu_timestamp, receive_trace, receive_trace_non_fetching_receiver,
73+
receive_trace_priority_messages,
7274
link_receive_call_correlation, self_send, timeout_trace,
7375
send_trace, procs_trace, dist_procs_trace, suspend,
7476
suspend_exit, suspender_exit,
@@ -310,6 +312,68 @@ receive_trace_non_fetching_receiver_test(Node) ->
310312
_ = process_flag(trap_exit, TrapExit),
311313
ok.
312314

315+
receive_trace_priority_messages(Config) when is_list(Config) ->
316+
receive_trace_priority_messages_test(node()),
317+
{ok, Peer, Node} = ?CT_PEER(),
318+
receive_trace_priority_messages_test(Node),
319+
peer:stop(Peer),
320+
ok.
321+
322+
receive_trace_priority_messages_test(Node) ->
323+
Tester = self(),
324+
325+
PrioReceiver = spawn_link(fun () ->
326+
Tester ! {self(), alias([priority])},
327+
receive after infinity -> ok end
328+
end),
329+
PriorityAlias = receive
330+
{PrioReceiver, PrioAlias} ->
331+
PrioAlias
332+
end,
333+
334+
1 = erlang_trace(PrioReceiver, true, ['receive']),
335+
336+
Msgs = [{msg, 1}, {prio_msg, 1}, {msg, 2}, {prio_msg, 2}, {msg, 3},
337+
{msg, 4}, {prio_msg, 3}, {prio_msg, 4}, {msg, 5}, {prio_msg, 5},
338+
{msg, 6}, {msg, 7}, {msg, 8}, {prio_msg, 6}, {prio_msg, 7}],
339+
340+
ok = erpc:call(Node,
341+
fun () ->
342+
lists:foreach(
343+
fun (Msg) ->
344+
Tmo = rand:uniform(101) - 1,
345+
receive after Tmo -> ok end,
346+
case Msg of
347+
{msg, _} ->
348+
PrioReceiver ! Msg;
349+
{prio_msg, _} ->
350+
erlang:send(PriorityAlias, Msg,
351+
[priority])
352+
end
353+
end, Msgs),
354+
ok
355+
end),
356+
357+
lists:foreach(fun (Msg) ->
358+
{trace, PrioReceiver, 'receive', Msg}
359+
= receive_first_trace()
360+
end, Msgs),
361+
362+
MsgQ = lists:sort(fun ({X, XA}, {X, XB}) ->
363+
XA =< XB;
364+
({prio_msg, _}, {msg, _}) ->
365+
true;
366+
({msg, _}, {prio_msg, _}) ->
367+
false
368+
end, Msgs),
369+
370+
{messages, MsgQ} = process_info(PrioReceiver, messages),
371+
372+
unlink(PrioReceiver),
373+
exit(PrioReceiver, kill),
374+
false = is_process_alive(PrioReceiver),
375+
ok.
376+
313377
%% Tests that receive of a message always happens before a call with
314378
%% that message and that links/unlinks are ordered together with the
315379
%% 'receive'.

0 commit comments

Comments
 (0)