Skip to content

Commit 99c180f

Browse files
Djordjezerth
authored andcommitted
Auto discovery (#17)
* elasticache cluster auto discovery
1 parent 8032480 commit 99c180f

File tree

3 files changed

+96
-2
lines changed

3 files changed

+96
-2
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ clean:
2424

2525
test: logs_clean all
2626
@$(REBAR) skip_deps=true ct
27+
@$(REBAR) skip_deps=true verbose=1 eunit
2728

2829
testfast:
2930
@$(REBAR) skip_deps=true ct
31+
@$(REBAR) skip_deps=true eunit
3032

3133
xref:
3234
@$(REBAR) skip_deps=true xref
33-

README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,22 @@ algorithms available are `shard_phash2` and `shard_crc32`.
6969
7070
```
7171

72+
Cluster Auto Discovery
73+
======================
74+
75+
Configuration can also be implemented to support auto discovery of the cluster as opposed to hardcoding nodes. Provide the configuration endpoint ([AWS Reference](http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/AutoDiscovery.html)) and port as in the following example.
76+
77+
```
78+
[{cluster_b,
79+
[{servers, {elasticache, "ConfigEndpointHostname.com", PortNumber}},
80+
{sharding_algorithm, {mero, shard_crc32}},
81+
{workers_per_shard, 1},
82+
{pool_worker_module, mero_wrk_tcp_binary}]
83+
},
84+
...
85+
]
86+
87+
```
7288

7389
Using Mero:
7490
===============

src/mero_sup.erl

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,14 @@
4949
-spec start_link(ClusterConfig :: list({ClusterName :: atom(),
5050
Config :: list()})) ->
5151
{ok, Pid :: pid()} | {error, Reason :: term()}.
52-
start_link(ClusterConfig) ->
52+
start_link(Config) ->
53+
ClusterConfig = process_server_specs(Config),
54+
5355
ok = mero_cluster:load_clusters(ClusterConfig),
5456
PoolDefs = mero_cluster:child_definitions(),
5557
supervisor:start_link({local, ?MODULE}, ?MODULE, [PoolDefs]).
5658

59+
5760
%%%===================================================================
5861
%%% Supervisor callbacks
5962
%%%===================================================================
@@ -68,3 +71,77 @@ init([PoolDefs]) ->
6871
child(I, Type, {ClusterName, Host, Port, Name, WrkModule}) ->
6972
{Name, {I, start_link, [ClusterName, Host, Port, Name, WrkModule]}, permanent,
7073
5000, Type, [I]}.
74+
75+
76+
%%%===================================================================
77+
%%% Internal functions
78+
%%%===================================================================
79+
80+
%% Given an elasticache config Endpoint and port, return parsed list of {host, port} nodes in cluster
81+
-spec get_cluster_config(string(), integer()) -> list({string(), integer()}).
82+
get_cluster_config(ConfigHost, ConfigPort) ->
83+
{ok, [{banner, <<"CONFIG cluster", _/binary>>},
84+
{version, VersionLine},
85+
{hosts, HostLine},
86+
{crlf, <<"\r\n">>},
87+
{eom, <<"END\r\n">>}]} =
88+
request_response(ConfigHost, ConfigPort,
89+
<<"config get cluster\n">>,
90+
[banner, version, hosts, crlf, eom]),
91+
{_Version, Hosts} = parse_cluster_config(HostLine, VersionLine),
92+
Hosts.
93+
94+
process_server_specs(L) ->
95+
lists:foldl(fun ({ClusterName, AttrPlist}, Acc) ->
96+
[{ClusterName, [process_value(Attr)
97+
|| Attr <- AttrPlist]} | Acc]
98+
end, [], L).
99+
100+
process_value({servers, {elasticache, ConfigEndpoint, ConfigPort}}) ->
101+
HostsPorts = get_cluster_config(ConfigEndpoint, ConfigPort),
102+
{servers, HostsPorts};
103+
process_value(V) ->
104+
V.
105+
106+
request_response(Host, Port, Command, Names) ->
107+
Opts = [binary, {packet, line}, {active, false}],
108+
{ok, Socket} = gen_tcp:connect(Host, Port, Opts),
109+
ok = gen_tcp:send(Socket, Command),
110+
Lines = [{Name, begin
111+
{ok, Line} = gen_tcp:recv(Socket, 0, 1000),
112+
Line
113+
end}
114+
|| Name <- Names],
115+
ok = gen_tcp:close(Socket),
116+
{ok, Lines}.
117+
118+
%% Parse host and version lines to return version and list of {host, port} cluster nodes
119+
-spec parse_cluster_config(binary(), binary()) -> {integer(), [{string(), integer()}]}.
120+
parse_cluster_config(HostLine, VersionLine) ->
121+
HostSpecs = re:split(butlast(HostLine), <<" ">>),
122+
{binary_to_integer(butlast(VersionLine)),
123+
[begin
124+
[Host, _IP, Port] = re:split(HIP, "\\|"),
125+
{binary_to_list(Host), binary_to_integer(Port)}
126+
end
127+
|| HIP <- HostSpecs]}.
128+
129+
butlast(<<>>) -> <<>>;
130+
butlast(Bin) -> binary:part(Bin, {0, size(Bin) - 1}).
131+
132+
%%%===================================================================
133+
%%% Unit tests
134+
%%%===================================================================
135+
136+
-ifdef(TEST).
137+
138+
-include_lib("eunit/include/eunit.hrl").
139+
140+
get_cluster_config_test() ->
141+
VersionLine = <<"1\n">>,
142+
HostLine = <<"server1.cache.amazonaws.com|10.100.100.100|11211 server2.cache.amazonaws.com|10.101.101.00|11211 server3.cache.amazonaws.com|10.102.00.102|11211\n">>,
143+
ExpectedParse = {1, [{"server1.cache.amazonaws.com", 11211}, {"server2.cache.amazonaws.com", 11211}, {"server3.cache.amazonaws.com", 11211}]},
144+
145+
?assertEqual(ExpectedParse, parse_cluster_config(HostLine, VersionLine)).
146+
147+
-endif.

0 commit comments

Comments
 (0)