Skip to content

Commit a5c11b2

Browse files
Merge pull request #214 from inaka/disable_queue_manager
Make it possible to disable the queue manager
2 parents 60c2a89 + 1284852 commit a5c11b2

File tree

4 files changed

+59
-5
lines changed

4 files changed

+59
-5
lines changed

src/wpool.erl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,12 @@
151151
%%
152152
%% This option can take values `lifo' or `fifo'. Defaults to `fifo'.
153153

154+
-type enable_queues() :: boolean().
155+
%% A boolean value determining if `queue_manager' should be started for queueing requests.
156+
%%
157+
%% Defaults to `true'.
158+
%% Note that disabling this will disable `available_worker' and `next_available_worker' strategies.
159+
154160
-type enable_callbacks() :: boolean().
155161
%% A boolean value determining if `event_manager' should be started for callback modules.
156162
%%
@@ -202,6 +208,7 @@
202208
{pool_sup_period, pool_sup_period()} |
203209
{queue_type, queue_type()} |
204210
{enable_callbacks, enable_callbacks()} |
211+
{enable_queues, enable_queues()} |
205212
{callbacks, callbacks()}.
206213
%% Options that can be provided to a new pool.
207214
%%
@@ -221,6 +228,7 @@
221228
pool_sup_period => pool_sup_period(),
222229
queue_type => queue_type(),
223230
enable_callbacks => enable_callbacks(),
231+
enable_queues => enable_queues(),
224232
callbacks => callbacks(),
225233
_ => _}.
226234
%% Options that can be provided to a new pool.

src/wpool_pool.erl

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,9 @@ init({Name, Options}) ->
366366
_Wpool = store_wpool(Name, Size, Options),
367367

368368
WorkerOpts0 =
369-
[{queue_manager, QueueManagerName}, {time_checker, TimeCheckerName}
370-
| maybe_event_manager(Options, {event_manager, EventManagerName})],
369+
[{time_checker, TimeCheckerName}]
370+
++ maybe_queue_manager(Options, {queue_manager, QueueManagerName})
371+
++ maybe_event_manager(Options, {event_manager, EventManagerName}),
371372
WorkerOpts =
372373
maps:merge(
373374
maps:from_list(WorkerOpts0), Options),
@@ -406,7 +407,8 @@ init({Name, Options}) ->
406407
[wpool_process_sup]},
407408

408409
Children =
409-
[TimeCheckerSpec, QueueManagerSpec]
410+
[TimeCheckerSpec]
411+
++ maybe_queue_manager(Options, QueueManagerSpec)
410412
++ maybe_event_manager(Options, EventManagerSpec)
411413
++ [ProcessSupSpec],
412414

@@ -566,6 +568,11 @@ build_wpool(Name) ->
566568
undefined
567569
end.
568570

571+
maybe_queue_manager(#{enable_queues := false}, _) ->
572+
[];
573+
maybe_queue_manager(_, Item) ->
574+
[Item].
575+
569576
maybe_event_manager(#{enable_callbacks := true}, Item) ->
570577
[Item];
571578
maybe_event_manager(_, _) ->

src/wpool_queue_manager.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,12 @@ worker_dead(QueueManager, Worker) ->
141141
%% @see wpool_pool:stats/1
142142
-spec pending_task_count(queue_mgr()) -> non_neg_integer().
143143
pending_task_count(QueueManager) ->
144-
gen_server:call(QueueManager, pending_task_count).
144+
try
145+
gen_server:call(QueueManager, pending_task_count)
146+
catch
147+
_:{noproc, _} ->
148+
0
149+
end.
145150

146151
%%%===================================================================
147152
%%% gen_server callbacks

test/wpool_pool_SUITE.erl

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
-export([init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2]).
2828
-export([stop_worker/1, best_worker/1, next_worker/1, random_worker/1, available_worker/1,
2929
hash_worker/1, custom_worker/1, next_available_worker/1, wpool_record/1,
30-
queue_type_fifo/1, queue_type_lifo/1, get_workers/1]).
30+
queue_type_fifo/1, queue_type_lifo/1, get_workers/1, no_queue_manager/1]).
3131
-export([manager_crash/1, super_fast/1, mess_up_with_store/1]).
3232

3333
-elvis([{elvis_style, no_block_expressions, disable}]).
@@ -50,6 +50,9 @@ end_per_suite(Config) ->
5050
Config.
5151

5252
-spec init_per_testcase(atom(), config()) -> config().
53+
init_per_testcase(no_queue_manager = TestCase, Config) ->
54+
{ok, _} = wpool:start_pool(TestCase, [{workers, 1}, {enable_queues, false}]),
55+
Config;
5356
init_per_testcase(queue_type_lifo = TestCase, Config) ->
5457
{ok, _} = wpool:start_pool(TestCase, [{workers, 1}, {queue_type, lifo}]),
5558
Config;
@@ -454,6 +457,37 @@ manager_crash(_Config) ->
454457

455458
{comment, []}.
456459

460+
-spec no_queue_manager(config()) -> {comment, []}.
461+
no_queue_manager(_Config) ->
462+
Pool = no_queue_manager,
463+
464+
ct:log("Check that the pool is working"),
465+
{ok, ok} = wpool:call(Pool, {io, format, ["ok!~n"]}, random_worker),
466+
{ok, ok} = wpool:call(Pool, {io, format, ["ok!~n"]}, next_worker),
467+
{ok, ok} = wpool:call(Pool, {io, format, ["ok!~n"]}, {hash_worker, self()}),
468+
469+
ct:log("Impossible task"),
470+
Self = self(),
471+
try wpool:call(Pool, {erlang, send, [Self, something]}, available_worker, 0) of
472+
R ->
473+
ct:fail("Unexpected ~p", [R])
474+
catch
475+
_:no_workers ->
476+
ok
477+
end,
478+
479+
0 = proplists:get_value(total_message_queue_len, wpool:stats(Pool)),
480+
481+
ct:log("Wait a second and nothing gets here"),
482+
receive
483+
X ->
484+
ct:fail("Unexpected ~p", [X])
485+
after 1000 ->
486+
ok
487+
end,
488+
489+
{comment, []}.
490+
457491
-spec super_fast(config()) -> {comment, []}.
458492
super_fast(_Config) ->
459493
Pool = super_fast,

0 commit comments

Comments
 (0)