Skip to content

Commit d688cd9

Browse files
author
ElFantasma
authored
Prevent mero fail when fails a single cluster (#83)
* Added checks for failing clusters * Added support for failing and recovering clusters * Renamed Retry/Retries variables and added some formatting to tests * Renamed Retries variable * Changed mero config handling to allow addition or removal of cluster configurations * Removed test case declarations * Removed retry behavior. Now is not necessary as we are retrying on every heartbeat * Reverted a small change
1 parent 1f04576 commit d688cd9

File tree

5 files changed

+271
-59
lines changed

5 files changed

+271
-59
lines changed

src/mero_conf.erl

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131

3232
-author('Miriam Pena <[email protected]>').
3333

34-
%% # of times we'll retry to get the configuration from elasticache before raising an error
35-
-define(MAX_RETRIES, 4).
36-
3734
%% It's dynamically invoked using rpc:pmap/3
3835
-ignore_xref({?MODULE, get_elasticache_cluster_configs, 1}).
3936

@@ -46,8 +43,7 @@
4643
pool_max_connection_delay_time/1, pool_min_connection_interval/1,
4744
max_connection_delay_time/1, stat_callback/0, stat_callback/1, add_now/1, add_now/2,
4845
millis_to/1, millis_to/2, process_server_specs/1, elasticache_load_config_delay/0,
49-
elasticache_load_config_delay/1, monitor_heartbeat_delay/0, monitor_heartbeat_delay/2,
50-
get_elasticache_cluster_configs/1]).
46+
elasticache_load_config_delay/1, monitor_heartbeat_delay/0, monitor_heartbeat_delay/2]).
5147

5248
-include_lib("mero/include/mero.hrl").
5349

@@ -220,14 +216,32 @@ millis_to(TimeLimit, Then) ->
220216
end.
221217

222218
-spec process_server_specs(mero:cluster_config()) -> mero:cluster_config().
223-
process_server_specs(Clusters) ->
224-
[{ClusterName, [process_value(Attr) || Attr <- Attrs]}
225-
|| {ClusterName, Attrs} <- Clusters].
219+
process_server_specs([]) ->
220+
[];
221+
process_server_specs([Cluster | Clusters]) ->
222+
case process_server_spec(Cluster) of
223+
{error, _} ->
224+
process_server_specs(Clusters);
225+
Config ->
226+
[Config | process_server_specs(Clusters)]
227+
end.
226228

227229
%%%=============================================================================
228230
%%% Internal functions
229231
%%%=============================================================================
230232

233+
process_server_spec({ClusterName, Attrs}) ->
234+
try
235+
{ClusterName, [process_value(Attr) || Attr <- Attrs]}
236+
catch
237+
Kind:Desc:Stack ->
238+
error_logger:error_report([{error, mero_config_failed},
239+
{kind, Kind},
240+
{desc, Desc},
241+
{stack, Stack}]),
242+
{error, Desc}
243+
end.
244+
231245
get_env(Key) ->
232246
case application:get_env(mero, Key) of
233247
{ok, Value} ->
@@ -273,18 +287,9 @@ get_elasticache_cluster_configs({Host, Port}) ->
273287
[get_elasticache_cluster_config(Host, Port)].
274288

275289
get_elasticache_cluster_config(Host, Port) ->
276-
get_elasticache_cluster_config(Host,
277-
Port,
278-
0,
279-
mero_elasticache:get_cluster_config(Host, Port)).
280-
281-
get_elasticache_cluster_config(_Host, _Port, _Retries, {ok, Entries}) ->
282-
Entries;
283-
get_elasticache_cluster_config(Host, Port, ?MAX_RETRIES, {error, Reason}) ->
284-
error({Reason, Host, Port});
285-
get_elasticache_cluster_config(Host, Port, Retries, {error, _Reason}) ->
286-
timer:sleep(trunc(math:pow(2, Retries)) * 100),
287-
get_elasticache_cluster_config(Host,
288-
Port,
289-
Retries + 1,
290-
mero_elasticache:get_cluster_config(Host, Port)).
290+
case mero_elasticache:get_cluster_config(Host, Port) of
291+
{ok, Entries} ->
292+
Entries;
293+
{error, Reason} ->
294+
error({Reason, Host, Port})
295+
end.

src/mero_conf_monitor.erl

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ update_cluster_defs(NewProcessedConfig, State) ->
121121
ok = mero_cluster:load_clusters(NewProcessedConfig),
122122
NewClusterVersion = mero_cluster:version(),
123123

124-
ok = update_clusters(OldProcessedConfig, NewProcessedConfig),
124+
ok = update_clusters(lists:sort(OldProcessedConfig), lists:sort(NewProcessedConfig)),
125125

126126
ok = purge_if_version_changed(OldClusterVersion, NewClusterVersion),
127127

@@ -132,26 +132,47 @@ purge_if_version_changed(ClusterVersion, ClusterVersion) ->
132132
purge_if_version_changed(_OldVersion, _NewClusterVersion) ->
133133
mero_cluster:purge().
134134

135-
%% NOTE: since both cluster definitions are generated through mero_conf:process_server_specs/1
136-
%% with the same input, we can be sure that the resulting lists will contain the same number
137-
%% of elements, with the same keys in the same order.
135+
%% NOTE: Old ClusterDefs and New ones are sorted by ClusterName, so it is possible to detect
136+
%% when some Def is present or missing in either list
138137
update_clusters([], []) ->
139138
ok;
139+
update_clusters([],
140+
[{ClusterName, _} | NewClusterDefs]) -> %% NewCluster is new, start it
141+
mero_sup:start_child(ClusterName),
142+
update_clusters([], NewClusterDefs);
143+
update_clusters([{ClusterName, _} | OldClusterDefs],
144+
[]) -> %% OldCluster not present, terminate it
145+
mero_sup:terminate_child(ClusterName),
146+
update_clusters(OldClusterDefs, []);
140147
update_clusters([ClusterDef | OldClusterDefs],
141148
[ClusterDef | NewClusterDefs]) -> %% nothing changed
142149
update_clusters(OldClusterDefs, NewClusterDefs);
143150
update_clusters([{ClusterName, OldAttrs} | OldClusterDefs],
144-
[{ClusterName, NewAttrs} | NewClusterDefs]) ->
145-
OldServers =
146-
lists:sort(
147-
proplists:get_value(servers, OldAttrs)),
151+
[{ClusterName, NewAttrs} | NewClusterDefs]) -> %% config changed, check it
152+
OldServers = get_servers(OldAttrs),
148153
ok =
149-
case lists:sort(
150-
proplists:get_value(servers, NewAttrs))
151-
of
152-
OldServers ->
153-
ok; %% Nothing of relevance changed
154-
_ ->
154+
case get_servers(NewAttrs) of
155+
OldServers -> %% Nothing of relevance changed
156+
ok;
157+
_ -> %% Different servers, restart the cluster
155158
mero_sup:restart_child(ClusterName)
156159
end,
160+
update_clusters(OldClusterDefs, NewClusterDefs);
161+
update_clusters([{OldClusterName, _} | OldClusterDefs],
162+
[{NewClusterName, _} | _] = NewClusterDefs)
163+
when OldClusterName < NewClusterName -> %% OldCluster not present, terminate it
164+
mero_sup:terminate_child(OldClusterName),
165+
update_clusters(OldClusterDefs, NewClusterDefs);
166+
update_clusters([{OldClusterName, _} | _] = OldClusterDefs,
167+
[{NewClusterName, _} | NewClusterDefs])
168+
when OldClusterName > NewClusterName -> %% NewCluster is new, start it
169+
mero_sup:start_child(NewClusterName),
157170
update_clusters(OldClusterDefs, NewClusterDefs).
171+
172+
get_servers(Attrs) ->
173+
case proplists:get_value(servers, Attrs) of
174+
Error when is_tuple(Error) ->
175+
Error;
176+
Servers when is_list(Servers) ->
177+
lists:sort(Servers)
178+
end.

src/mero_sup.erl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
-author('Miriam Pena <[email protected]>').
3232

33-
-export([start_link/1, restart_child/1, init/1]).
33+
-export([start_link/1, start_child/1, restart_child/1, terminate_child/1, init/1]).
3434

3535
-behaviour(supervisor).
3636

@@ -49,11 +49,21 @@
4949
start_link(OrigConfig) ->
5050
supervisor:start_link({local, ?MODULE}, ?MODULE, #{orig_config => OrigConfig}).
5151

52+
-spec start_child(ClusterName :: atom()) -> ok.
53+
start_child(ClusterName) ->
54+
{ok, _} = supervisor:start_child(?MODULE, cluster_sup_spec(ClusterName)),
55+
ok.
56+
5257
-spec restart_child(ClusterName :: atom()) -> ok.
5358
restart_child(ClusterName) ->
59+
terminate_child(ClusterName),
60+
start_child(ClusterName),
61+
ok.
62+
63+
-spec terminate_child(ClusterName :: atom()) -> ok.
64+
terminate_child(ClusterName) ->
5465
_ = supervisor:terminate_child(?MODULE, ClusterName),
5566
_ = supervisor:delete_child(?MODULE, ClusterName),
56-
{ok, _} = supervisor:start_child(?MODULE, cluster_sup_spec(ClusterName)),
5767
ok.
5868

5969
%%%===================================================================

test/mero_conf_SUITE.erl

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
-export([all/0, init_per_testcase/2, end_per_testcase/2, helper_mfa_config_function/0,
3838
diff/1, process_server_specs_a_compatible/1, process_server_specs_a/1,
3939
process_server_specs_a_alternate/1, process_server_specs_a_b/1,
40-
process_server_specs_a_b_c/1, process_server_specs_mfa/1, per_pool_config/1]).
40+
process_server_specs_a_b_c/1, process_server_specs_mfa/1, per_pool_config/1,
41+
process_server_specs_with_failure/1]).
4142

4243
all() ->
4344
[diff,
@@ -47,7 +48,8 @@ all() ->
4748
process_server_specs_a_b,
4849
process_server_specs_a_b_c,
4950
process_server_specs_mfa,
50-
per_pool_config].
51+
per_pool_config,
52+
process_server_specs_with_failure].
5153

5254
init_per_testcase(diff, Conf) ->
5355
Conf;
@@ -65,22 +67,24 @@ init_per_testcase(_, Conf) ->
6567
"c3.com|10.102.00.102|11211 c4.com|10.102.00.102|11211\n">>,
6668
meck:expect(mero_elasticache,
6769
request_response,
68-
fun(Type, _, _, _) ->
69-
HostLines =
70-
case Type of
71-
a ->
72-
HostLinea;
73-
b ->
74-
HostLineb;
75-
c ->
76-
HostLinec
77-
end,
78-
{ok,
79-
[{banner, <<"CONFIG cluster ...">>},
80-
{version, <<"version1">>},
81-
{hosts, HostLines},
82-
{crlf, <<"\r\n">>},
83-
{eom, <<"END\r\n">>}]}
70+
fun (failing_cluster, _, _, _) ->
71+
{error, econnrefused};
72+
(Type, _, _, _) ->
73+
HostLines =
74+
case Type of
75+
a ->
76+
HostLinea;
77+
b ->
78+
HostLineb;
79+
c ->
80+
HostLinec
81+
end,
82+
{ok,
83+
[{banner, <<"CONFIG cluster ...">>},
84+
{version, <<"version1">>},
85+
{hosts, HostLines},
86+
{crlf, <<"\r\n">>},
87+
{eom, <<"END\r\n">>}]}
8488
end),
8589
Conf.
8690

@@ -235,3 +239,47 @@ per_pool_config(_Conf) ->
235239
?assertEqual(30, mero_conf:pool_initial_connections(pool_1)),
236240
?assertEqual(50, mero_conf:pool_initial_connections(pool_2)),
237241
ok.
242+
243+
process_server_specs_with_failure(_Conf) ->
244+
meck:new(error_logger, [unstick, passthrough]),
245+
Spec =
246+
[{cluster_a,
247+
[{servers, {elasticache, [{a, 11211, 2}]}},
248+
{sharding_algorithm, {mero, shard_crc32}},
249+
{workers_per_shard, 1},
250+
{pool_worker_module, mero_wrk_tcp_binary}]},
251+
{cluster_b,
252+
[{servers, {elasticache, [{failing_cluster, 11211, 3}]}},
253+
{sharding_algorithm, {mero, shard_crc32}},
254+
{workers_per_shard, 1},
255+
{pool_worker_module, mero_wrk_tcp_binary}]},
256+
{cluster_c,
257+
[{servers, {elasticache, [{c, 11211, 1}]}},
258+
{sharding_algorithm, {mero, shard_crc32}},
259+
{workers_per_shard, 1},
260+
{pool_worker_module, mero_wrk_tcp_binary}]}],
261+
[{cluster_a, ServerSpecsA}, {cluster_c, ServerSpecsC}] =
262+
mero_conf:process_server_specs(Spec),
263+
?assertEqual([{"a1.com", 11211},
264+
{"a2.com", 11211},
265+
{"a3.com", 11211},
266+
{"a1.com", 11211},
267+
{"a2.com", 11211},
268+
{"a3.com", 11211}],
269+
proplists:get_value(servers, ServerSpecsA)),
270+
?assertEqual(mero_wrk_tcp_binary, proplists:get_value(pool_worker_module, ServerSpecsA)),
271+
?assertEqual(1, proplists:get_value(workers_per_shard, ServerSpecsA)),
272+
?assertEqual({mero, shard_crc32}, proplists:get_value(sharding_algorithm, ServerSpecsA)),
273+
274+
?assertEqual(1,
275+
meck:num_calls(error_logger,
276+
error_report,
277+
[[{error, mero_config_failed},
278+
{kind, error},
279+
{desc, {econnrefused, failing_cluster, 11211}},
280+
{stack, '_'}]])),
281+
282+
?assertEqual([{"c1.com", 11211}, {"c2.com", 11211}, {"c3.com", 11211}, {"c4.com", 11211}],
283+
proplists:get_value(servers, ServerSpecsC)),
284+
meck:unload([error_logger]),
285+
ok.

0 commit comments

Comments
 (0)