Skip to content

Commit abfb62f

Browse files
committed
Optionally queue outgoing data
Support queueing outgoing stanzas and stream management elements for up to a configurable number of milliseconds (with a configurable queue size limit). This allows for batching up multiple XML elements into a single TCP packet in order to reduce the TCP/IP overhead. The feature is supported by ejabberd_c2s, ejabberd_s2s_out, and ejabberd_service. It can be enabled by configuring the max. number of milliseconds to queue an element (default: 0), and optionally the max. number of elements to queue (default: 10). This can be done by using the following new ejabberd_c2s/ejabberd_service listener options: - max_send_queue_size - max_send_queue_delay For ejabberd_c2s, the following global options can be specified instead: - c2s_max_send_queue_size - c2s_max_send_queue_delay For ejabberd_s2s_out, the following global options can be specified: - s2s_max_send_queue_size - s2s_max_send_queue_delay
1 parent a89b1f3 commit abfb62f

8 files changed

+124
-5
lines changed

rebar.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.28"}}},
7171
{if_var_true, stun,
7272
{stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.2.2"}}}},
73-
{xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.5.8"}}},
73+
{xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}},
7474
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.13"}}}
7575
]}.
7676

src/ejabberd_c2s.erl

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,18 @@ init([State, Opts]) ->
555555
TLSVerify = proplists:get_bool(tls_verify, Opts),
556556
Zlib = proplists:get_bool(zlib, Opts),
557557
Timeout = ejabberd_option:negotiation_timeout(),
558+
MaxQSize = case ejabberd_option:c2s_max_send_queue_size() of
559+
undefined ->
560+
proplists:get_value(max_send_queue_size, Opts, 10);
561+
C2SMaxQSize ->
562+
C2SMaxQSize
563+
end,
564+
MaxQDelay = case ejabberd_option:c2s_max_send_queue_delay() of
565+
undefined ->
566+
proplists:get_value(max_send_queue_delay, Opts, 0);
567+
C2SMaxQDelay ->
568+
C2SMaxQDelay
569+
end,
558570
State1 = State#{tls_options => TLSOpts2,
559571
tls_required => TLSRequired,
560572
tls_enabled => TLSEnabled,
@@ -567,7 +579,8 @@ init([State, Opts]) ->
567579
access => Access,
568580
shaper => Shaper},
569581
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
570-
ejabberd_hooks:run_fold(c2s_init, {ok, State2}, [Opts]).
582+
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
583+
ejabberd_hooks:run_fold(c2s_init, {ok, State3}, [Opts]).
571584

572585
handle_call(get_presence, From, #{jid := JID} = State) ->
573586
Pres = case maps:get(pres_last, State, error) of
@@ -1022,4 +1035,6 @@ listen_options() ->
10221035
{tls_verify, false},
10231036
{zlib, false},
10241037
{max_stanza_size, infinity},
1038+
{max_send_queue_size, 10},
1039+
{max_send_queue_delay, 0},
10251040
{max_fsm_queue, 10000}].

src/ejabberd_listener.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,10 @@ listen_opt_type(tls) ->
680680
econf:bool();
681681
listen_opt_type(max_stanza_size) ->
682682
econf:pos_int(infinity);
683+
listen_opt_type(max_send_queue_size) ->
684+
econf:non_neg_int();
685+
listen_opt_type(max_send_queue_delay) ->
686+
econf:non_neg_int();
683687
listen_opt_type(max_fsm_queue) ->
684688
econf:pos_int();
685689
listen_opt_type(send_timeout) ->

src/ejabberd_option.erl

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
-export([c2s_cafile/0, c2s_cafile/1]).
2323
-export([c2s_ciphers/0, c2s_ciphers/1]).
2424
-export([c2s_dhfile/0, c2s_dhfile/1]).
25+
-export([c2s_max_send_queue_delay/0]).
26+
-export([c2s_max_send_queue_size/0]).
2527
-export([c2s_protocol_options/0, c2s_protocol_options/1]).
2628
-export([c2s_tls_compression/0, c2s_tls_compression/1]).
2729
-export([ca_file/0]).
@@ -124,6 +126,8 @@
124126
-export([s2s_dns_retries/0, s2s_dns_retries/1]).
125127
-export([s2s_dns_timeout/0, s2s_dns_timeout/1]).
126128
-export([s2s_max_retry_delay/0]).
129+
-export([s2s_max_send_queue_delay/0, s2s_max_send_queue_delay/1]).
130+
-export([s2s_max_send_queue_size/0, s2s_max_send_queue_size/1]).
127131
-export([s2s_protocol_options/0, s2s_protocol_options/1]).
128132
-export([s2s_queue_type/0, s2s_queue_type/1]).
129133
-export([s2s_timeout/0, s2s_timeout/1]).
@@ -275,6 +279,14 @@ c2s_dhfile() ->
275279
c2s_dhfile(Host) ->
276280
ejabberd_config:get_option({c2s_dhfile, Host}).
277281

282+
-spec c2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
283+
c2s_max_send_queue_delay() ->
284+
ejabberd_config:get_option({c2s_max_send_queue_delay, global}).
285+
286+
-spec c2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
287+
c2s_max_send_queue_size() ->
288+
ejabberd_config:get_option({c2s_max_send_queue_size, global}).
289+
278290
-spec c2s_protocol_options() -> 'undefined' | binary().
279291
c2s_protocol_options() ->
280292
c2s_protocol_options(global).
@@ -851,6 +863,20 @@ s2s_dns_timeout(Host) ->
851863
s2s_max_retry_delay() ->
852864
ejabberd_config:get_option({s2s_max_retry_delay, global}).
853865

866+
-spec s2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
867+
s2s_max_send_queue_delay() ->
868+
s2s_max_send_queue_delay(global).
869+
-spec s2s_max_send_queue_delay(global | binary()) -> 'undefined' | non_neg_integer().
870+
s2s_max_send_queue_delay(Host) ->
871+
ejabberd_config:get_option({s2s_max_send_queue_delay, Host}).
872+
873+
-spec s2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
874+
s2s_max_send_queue_size() ->
875+
s2s_max_send_queue_size(global).
876+
-spec s2s_max_send_queue_size(global | binary()) -> 'undefined' | non_neg_integer().
877+
s2s_max_send_queue_size(Host) ->
878+
ejabberd_config:get_option({s2s_max_send_queue_size, Host}).
879+
854880
-spec s2s_protocol_options() -> 'undefined' | binary().
855881
s2s_protocol_options() ->
856882
s2s_protocol_options(global).

src/ejabberd_options.erl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,10 @@ opt_type(c2s_ciphers) ->
9595
end;
9696
opt_type(c2s_dhfile) ->
9797
econf:file();
98+
opt_type(c2s_max_send_queue_delay) ->
99+
econf:non_neg_int();
100+
opt_type(c2s_max_send_queue_size) ->
101+
econf:non_neg_int();
98102
opt_type(c2s_protocol_options) ->
99103
econf:and_then(
100104
econf:list(econf:binary(), [unique]),
@@ -337,6 +341,10 @@ opt_type(s2s_dns_timeout) ->
337341
econf:timeout(second, infinity);
338342
opt_type(s2s_max_retry_delay) ->
339343
econf:timeout(second);
344+
opt_type(s2s_max_send_queue_delay) ->
345+
econf:non_neg_int();
346+
opt_type(s2s_max_send_queue_size) ->
347+
econf:non_neg_int();
340348
opt_type(s2s_protocol_options) ->
341349
opt_type(c2s_protocol_options);
342350
opt_type(s2s_queue_type) ->
@@ -527,6 +535,8 @@ options() ->
527535
{c2s_cafile, undefined},
528536
{c2s_ciphers, undefined},
529537
{c2s_dhfile, undefined},
538+
{c2s_max_send_queue_delay, undefined},
539+
{c2s_max_send_queue_size, undefined},
530540
{c2s_protocol_options, undefined},
531541
{c2s_tls_compression, undefined},
532542
{ca_file, iolist_to_binary(pkix:get_cafile())},
@@ -635,6 +645,8 @@ options() ->
635645
{s2s_dns_retries, 2},
636646
{s2s_dns_timeout, timer:seconds(10)},
637647
{s2s_max_retry_delay, timer:seconds(300)},
648+
{s2s_max_send_queue_delay, 0},
649+
{s2s_max_send_queue_size, 10},
638650
{s2s_protocol_options, undefined},
639651
{s2s_queue_type,
640652
fun(Host) -> ejabberd_config:get_option({queue_type, Host}) end},
@@ -705,6 +717,8 @@ globals() ->
705717
auth_cache_life_time,
706718
auth_cache_missed,
707719
auth_cache_size,
720+
c2s_max_send_queue_delay,
721+
c2s_max_send_queue_size,
708722
ca_file,
709723
captcha_cmd,
710724
captcha_host,
@@ -752,6 +766,8 @@ globals() ->
752766
router_use_cache,
753767
rpc_timeout,
754768
s2s_max_retry_delay,
769+
c2s_max_send_queue_delay,
770+
c2s_max_send_queue_size,
755771
shaper,
756772
sm_cache_life_time,
757773
sm_cache_missed,

src/ejabberd_options_doc.erl

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,31 @@ doc() ->
430430
"dhparam -out dh.pem 2048\". If this option is not specified, "
431431
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
432432
"used as defined in RFC5114 Section 2.3.")}},
433+
{c2s_max_send_queue_delay,
434+
#{value => ?T("non_neg_integer()"),
435+
desc =>
436+
[?T("Specifies the maximum number of milliseconds to queue an "
437+
"outgoing stanza or stream management element. Setting this "
438+
"option to a positive (non-zero) number allows for batching up "
439+
"multiple XML elements into a single TCP packet in order to "
440+
"reduce the TCP/IP overhead. The default value is '0', which "
441+
"disables queueing."), "",
442+
?T("To set a specific file per listener, use the listener's "
443+
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
444+
"option. Please note that 'c2s_max_send_queue_delay' overrides "
445+
"the listener's 'max_send_queue_delay' option."), ""]}},
446+
{c2s_max_send_queue_size,
447+
#{value => ?T("non_neg_integer()"),
448+
desc =>
449+
[?T("Specifies the maximum number of elements to add to the send "
450+
"queue. The default value is '10'. Note that this option has "
451+
"no effect if 'max_send_queue_delay' isn't set to a value "
452+
"larger than '0'. Setting this option to '0' disables "
453+
"queueing."), "",
454+
?T("To set a specific file per listener, use the listener's "
455+
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
456+
"option. Please note that 'c2s_max_send_queue_size' overrides "
457+
"the listener's 'max_send_queue_size' option."), ""]}},
433458
{c2s_protocol_options,
434459
#{value => "[Option, ...]",
435460
desc =>
@@ -1118,6 +1143,31 @@ doc() ->
11181143
"dhparam -out dh.pem 2048\". If this option is not specified, "
11191144
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
11201145
"used as defined in RFC5114 Section 2.3.")}},
1146+
{s2s_max_send_queue_delay,
1147+
#{value => ?T("non_neg_integer()"),
1148+
desc =>
1149+
[?T("Specifies the maximum number of milliseconds to queue an "
1150+
"outgoing stanza or stream management element. Setting this "
1151+
"option to a positive (non-zero) number allows for batching up "
1152+
"multiple XML elements into a single TCP packet in order to "
1153+
"reduce the TCP/IP overhead. The default value is '0', which "
1154+
"disables queueing."), "",
1155+
?T("To set a specific file per listener, use the listener's "
1156+
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
1157+
"option. Please note that 's2s_max_send_queue_delay' overrides "
1158+
"the listener's 'max_send_queue_delay' option."), ""]}},
1159+
{s2s_max_send_queue_size,
1160+
#{value => ?T("non_neg_integer()"),
1161+
desc =>
1162+
[?T("Specifies the maximum number of elements to add to the send "
1163+
"queue. The default value is '10'. Note that this option has "
1164+
"no effect if 'max_send_queue_delay' isn't set to a value "
1165+
"larger than '0'. Setting this option to '0' disables "
1166+
"queueing."), "",
1167+
?T("To set a specific file per listener, use the listener's "
1168+
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
1169+
"option. Please note that 's2s_max_send_queue_size' overrides "
1170+
"the listener's 'max_send_queue_size' option."), ""]}},
11211171
{s2s_protocol_options,
11221172
#{value => "[Option, ...]",
11231173
desc =>

src/ejabberd_s2s_out.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,16 +279,19 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
279279
false -> unlimited
280280
end,
281281
Timeout = ejabberd_option:negotiation_timeout(),
282+
MaxQSize = ejabberd_option:s2s_max_send_queue_size(),
283+
MaxQDelay = ejabberd_option:s2s_max_send_queue_delay(),
282284
State1 = State#{on_route => queue,
283285
queue => p1_queue:new(QueueType, QueueLimit),
284286
xmlns => ?NS_SERVER,
285287
lang => ejabberd_option:language(),
286288
server_host => ServerHost,
287289
shaper => none},
288290
State2 = xmpp_stream_out:set_timeout(State1, Timeout),
291+
State3 = xmpp_stream_out:configure_queue(State2, MaxQSize, MaxQDelay),
289292
?INFO_MSG("Outbound s2s connection started: ~ts -> ~ts",
290293
[LServer, RServer]),
291-
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State2}, [Opts]).
294+
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State3}, [Opts]).
292295

293296
handle_call(Request, From, #{server_host := ServerHost} = State) ->
294297
ejabberd_hooks:run_fold(s2s_out_handle_call, ServerHost, State, [Request, From]).

src/ejabberd_service.erl

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,13 @@ init([State, Opts]) ->
117117
true -> TLSOpts1
118118
end,
119119
GlobalRoutes = proplists:get_value(global_routes, Opts, true),
120+
MaxQSize = proplists:get_value(max_send_queue_size, Opts, 10),
121+
MaxQDelay = proplists:get_value(max_send_queue_delay, Opts, 0),
120122
Timeout = ejabberd_option:negotiation_timeout(),
121123
State1 = xmpp_stream_in:change_shaper(State, ejabberd_shaper:new(Shaper)),
122124
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
123-
State3 = State2#{access => Access,
125+
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
126+
State4 = State3#{access => Access,
124127
xmlns => ?NS_COMPONENT,
125128
lang => ejabberd_option:language(),
126129
server => ejabberd_config:get_myname(),
@@ -129,7 +132,7 @@ init([State, Opts]) ->
129132
tls_options => TLSOpts,
130133
global_routes => GlobalRoutes,
131134
check_from => CheckFrom},
132-
ejabberd_hooks:run_fold(component_init, {ok, State3}, [Opts]).
135+
ejabberd_hooks:run_fold(component_init, {ok, State4}, [Opts]).
133136

134137
handle_stream_start(_StreamStart,
135138
#{remote_server := RemoteServer,
@@ -302,6 +305,8 @@ listen_options() ->
302305
{tls, false},
303306
{tls_compression, false},
304307
{max_stanza_size, infinity},
308+
{max_send_queue_size, 10},
309+
{max_send_queue_delay, 0},
305310
{max_fsm_queue, 10000},
306311
{password, undefined},
307312
{hosts, []},

0 commit comments

Comments
 (0)