-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgg_conf.erl
311 lines (257 loc) · 11.7 KB
/
gg_conf.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
%%--------------------------------------------------------------------
%% Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
%% SPDX-License-Identifier: Apache-2.0
%%--------------------------------------------------------------------
-module(gg_conf).
-export([auth_mode/0]).
-export([start/0, stop/0]).
-export([receive_conf_updates/0, do_receive_conf_updates/0, request_update/0, request_update_sync/0]).
-export([register_config_change_handler/2]).
-type(auth_mode() :: enabled | bypass_on_failure | bypass).
-export_type([auth_mode/0]).
-define(CONF_TIMEOUT_MILLIS, 30000).
%% TODO figure out how to import emqx_conf.hrl instead
-define(READONLY_KEYS, [<<"cluster">>, <<"rpc">>, <<"node">>]).
%% config keys
-define(KEY_EMQX_CONFIG, <<"emqxConfig">>).
-define(KEY_AUTH_MODE, <<"authMode">>).
%% defaults
-define(DEFAULT_AUTH_MODE, enabled).
-define(SCHEMA_ROOT, list_to_binary(gg_schema:namespace())).
-define(ENV_APP, aws_greengrass_emqx_auth).
-define(KEY_DEFAULT_EMQX_CONF, default_emqx_conf).
-define(KEY_CONFIG_CHANGE_HANDLER, config_change_handler).
-define(CONF_OPTS, #{override_to => cluster}).
-define(UPDATE_PROC, gg_conf_listen_for_updates).
-define(CONF_UPDATE, conf_updated).
%%--------------------------------------------------------------------
%% Config API
%%--------------------------------------------------------------------
-spec(auth_mode() -> auth_mode()).
auth_mode() ->
application:get_env(?ENV_APP, ?KEY_AUTH_MODE, ?DEFAULT_AUTH_MODE).
%%--------------------------------------------------------------------
%% On Config Change Callbacks
%%--------------------------------------------------------------------
register_config_change_handler(Path, Handler) ->
Handlers = application:get_env(?ENV_APP, ?KEY_CONFIG_CHANGE_HANDLER, #{}),
application:set_env(?ENV_APP, ?KEY_CONFIG_CHANGE_HANDLER, Handlers#{Path => Handler}).
get_config_change_handler(Path) ->
Handlers = application:get_env(?ENV_APP, ?KEY_CONFIG_CHANGE_HANDLER, #{}),
maps:get(Path, Handlers, undefined).
fire_config_change(Path, NewVal) ->
case get_config_change_handler(Path) of
undefined -> skip;
Handler -> Handler(NewVal)
end.
%%--------------------------------------------------------------------
%% Greengrass EMQX Defaults
%%--------------------------------------------------------------------
greengrass_emqx_default_conf() ->
application:get_env(?ENV_APP, ?KEY_DEFAULT_EMQX_CONF, #{}).
load_greengrass_emqx_default_conf() ->
%% we know this is valid conf because it's also used in emqx.conf
Conf0 = case hocon:load(emqx:etc_file("gg.emqx.conf")) of
{ok, C} ->
normalize_map(C);
{error, Err} -> exit({error, {unable_to_read_config, Err}})
end,
%% also include env overrides because we set some in the recipe (e.g. ssl keyfile/certfile)
Conf1 = hocon_tconf:merge_env_overrides(emqx_schema, Conf0, all, #{format => map}),
%% some fields are lost during the override merge, bring them back in
Conf2 = hocon:deep_merge(Conf0, Conf1),
application:set_env(?ENV_APP, ?KEY_DEFAULT_EMQX_CONF, Conf2).
%%--------------------------------------------------------------------
%% Config Update Listener
%%--------------------------------------------------------------------
%% Subscribe to configuration updates and perform a one-time configuration update.
start() ->
case load_greengrass_emqx_default_conf() of
ok -> ok;
Err -> exit(Err)
end,
receive_conf_updates(),
gg_port_driver:subscribe_to_configuration_updates(fun request_update/0),
case request_update_sync() of
ok -> ok;
Error -> exit(Error)
end.
stop() ->
?UPDATE_PROC ! stop.
receive_conf_updates() ->
ListenPID = spawn(?MODULE, do_receive_conf_updates, []),
register(?UPDATE_PROC, ListenPID).
do_receive_conf_updates() ->
receive
{update, Pid} ->
logger:info("Config update request received"),
case update_conf_from_ipc() of
ok -> Pid ! ?CONF_UPDATE;
Err -> logger:error("Unable to get configuration, err=~p", [Err])
end,
do_receive_conf_updates();
stop -> ok
end.
request_update_sync() ->
request_update(),
wait_for_config_update(?CONF_TIMEOUT_MILLIS).
request_update() ->
logger:info("Requesting configuration update"),
?UPDATE_PROC ! {update, self()}.
wait_for_config_update(Timeout) ->
receive
?CONF_UPDATE -> ok
after Timeout ->
{error, "Timed out waiting for configuration"}
end.
%%--------------------------------------------------------------------
%% IPC Config Update Handler
%%--------------------------------------------------------------------
update_conf_from_ipc() ->
case gg_port_driver:get_configuration() of
{ok, Conf} ->
update_conf(Conf),
ok;
{error, _} = Error -> Error
end.
update_conf(NewComponentConf) when is_binary(NewComponentConf) ->
case decode_conf(NewComponentConf) of
{error, Err} -> logger:warning("Unable to decode configuration, skipping config update: ~p", [Err]);
DecodedConf -> update_conf(DecodedConf)
end;
update_conf(NewComponentConf) ->
ExistingOverrideConf =
case emqx_config:read_override_conf(?CONF_OPTS) of
undefined -> #{};
BadConf when is_list(BadConf) ->
logger:warning("Unexpected list format found when retrieving existing configuration, treating as empty"),
#{};
Conf -> Conf
end,
update_conf(ExistingOverrideConf, NewComponentConf).
decode_conf(Conf) when is_binary(Conf) ->
case catch jiffy:decode(Conf, [return_maps, dedupe_keys, {null_term, undefined}]) of
DecodedConf when is_map(DecodedConf) -> DecodedConf;
Err -> {error, Err}
end.
update_conf(ExistingOverrideConf, NewComponentConf) ->
ExistingConf = normalize_map(ExistingOverrideConf),
NewConf = normalize_map(NewComponentConf),
PluginConf = get_plugin_conf(NewConf),
try update_plugin_conf(PluginConf)
catch
PluginUpdateErr -> logger:warning("Unable to update plugin configuration: ~p", [PluginUpdateErr])
end,
OverrideConf = get_override_conf(NewConf),
logger:debug("Updating emqx override config. existing=~p, override=~p", [ExistingConf, OverrideConf]),
case catch update_override_conf(ExistingConf, OverrideConf) of
ok -> put_verify_fun();
OverrideUpdateError -> logger:warning("Unable to update emqx override configuration: ~p", [OverrideUpdateError])
end.
put_verify_fun() ->
put_verify_fun(auth_mode()).
put_verify_fun(AuthMode) when AuthMode =/= bypass ->
%% TODO don't do this if already set
case gg_listeners:put_verify_fun(ssl, default, fun gg_tls:verify_client_certificate/3) of
ok -> logger:info("SSL listener custom verify fun set");
Err -> logger:error("Failed to set listener verify fun: ~p", [Err])
end;
put_verify_fun(_AuthMode) ->
%% TODO only do this workaround on startup? listener restarts will disconnect clients
emqx_listeners:restart_listener(ssl, default, gg_listeners:get_listener_config(ssl, default)).
%%--------------------------------------------------------------------
%% Update Plugin Config
%%--------------------------------------------------------------------
get_plugin_conf(Conf) ->
%% Unknown fields not allowed when checking schema
Fields = lists:map(fun(SchemaEntry) -> atom_to_binary(element(1, SchemaEntry)) end, gg_schema:fields(gg_schema:namespace())),
PluginConf = maps:filter(fun(Key, _) -> lists:member(Key, Fields) end, Conf),
validate_plugin_conf(PluginConf).
validate_plugin_conf(PluginConf) ->
try
{_, CheckedConf} = hocon_tconf:map_translate(gg_schema, #{?SCHEMA_ROOT => PluginConf}, #{return_plain => true, format => map}),
maps:get(?SCHEMA_ROOT, normalize_map(CheckedConf))
catch throw:E:ST ->
{error, {config_validation, E, ST}}
end.
update_plugin_conf(PluginConf) ->
update_plugin_conf(PluginConf, ?KEY_AUTH_MODE, ?DEFAULT_AUTH_MODE).
update_plugin_conf(PluginConf, Key, Default) ->
PrevVal = application:get_env(?ENV_APP, Key, undefined),
NewVal = maps:get(Key, PluginConf, Default),
if PrevVal =/= NewVal ->
application:set_env(?ENV_APP, Key, NewVal),
logger:info("Updated ~p plugin config to ~p", [Key, NewVal]),
fire_config_change(Key, NewVal);
true -> skip
end.
%%--------------------------------------------------------------------
%% Update EMQX Override Config
%%--------------------------------------------------------------------
get_override_conf(Conf) ->
PluginConf = get_plugin_conf(Conf),
Conf1 = maps:get(?KEY_EMQX_CONFIG, Conf, #{}),
Conf2 = hocon:deep_merge(greengrass_emqx_default_conf(), Conf1),
Conf3 = maps:filter(fun(K,_) -> not maps:is_key(K, PluginConf) end, Conf2),
Conf4 = maps:filter(fun(Key, _) -> not lists:member(Key, ?READONLY_KEYS) end, Conf3),
Conf5 = no_cacertfile_workaround(Conf4),
Conf5.
%% We don't set cacertfile in ssl options. In order for this to take
%% in emqx_conf_cli:load_config, we must remove it from the configuration map.
%% Setting cacertfile to null, undefined, or empty string does not work (as of EMQX 5.1.1)
no_cacertfile_workaround(#{<<"listeners">> := Val} = Conf) ->
Conf#{<<"listeners">> => no_cacertfile_workaround(Val)};
no_cacertfile_workaround(#{<<"ssl">> := Val} = Conf) ->
Conf#{<<"ssl">> => no_cacertfile_workaround(Val)};
no_cacertfile_workaround(#{<<"default">> := Val} = Conf) ->
Conf#{<<"default">> => no_cacertfile_workaround(Val)};
no_cacertfile_workaround(#{<<"ssl_options">> := #{<<"cacertfile">> := CA} = Val} = Conf) when CA == null; CA == undefined ->
Conf#{<<"ssl_options">> => maps:remove(<<"cacertfile">>, Val)};
no_cacertfile_workaround(Conf)->
Conf.
update_override_conf(ExistingConf, #{} = NewConf) when map_size(NewConf) == 0 ->
clear_override_conf(ExistingConf);
update_override_conf(ExistingConf, NewConf) ->
MapToClear = maps:filter(fun(K,_) -> not maps:is_key(K, NewConf) end, ExistingConf),
clear_override_conf(MapToClear),
emqx_conf_cli:load_config(emqx_utils_json:encode(NewConf), replace).
clear_override_conf(Conf) when is_map(Conf) ->
ConfPaths0 = leaf_config_paths(Conf),
ConfPaths1 = uniq(ConfPaths0),
lists:foreach(fun remove_override_conf/1, ConfPaths1).
remove_override_conf([]) ->
ok;
remove_override_conf([_]) -> %% EMQX doesn't allow root key removal
ok;
remove_override_conf(Path) when is_list(Path) ->
case catch emqx_conf:remove(Path, ?CONF_OPTS) of
{ok, _} -> logger:info("Removed ~p config", [Path]);
{error, RemoveError} -> logger:warning("Failed to remove configuration. confPath=~p, error=~p", [Path, RemoveError]);
Err -> logger:warning("Failed to remove configuration. confPath=~p, error=~p", [Path, Err])
end,
%% remove subpaths as well, EMQX applications may be listening for config changes on a subpath,
%% so deleting just leaf configs may not trigger these listeners.
%% for example, bridges only reacts when [bridges, http, <bridge_name>] path changes.
remove_override_conf(lists:droplast(Path)).
leaf_config_paths(Conf) when is_map(Conf) ->
lists:map(fun ({Key, Val}) -> lists:flatten([Key] ++ leaf_config_paths(Val)) end, maps:to_list(Conf));
leaf_config_paths(_) ->
[].
normalize_map(Map) ->
Map1 = emqx_utils_maps:binary_key_map(Map), %% configs like authentication only work with binary keys
Map2 = replace_value(Map1, fun(V) -> V == null end, undefined), %% null is okay in emqx.conf but apparently not during an update
Map2.
replace_value(Element, Predicate, NewVal) when is_map(Element) ->
maps:fold(fun(K,V,Acc) -> maps:put(K, replace_value(V, Predicate, NewVal), Acc) end, #{}, Element);
replace_value(Element, Predicate, NewVal) when is_list(Element) ->
lists:map(fun(Elem) -> replace_value(Elem, Predicate, NewVal) end, Element);
replace_value(Element, Predicate, NewVal) ->
case Predicate(Element) of
true -> NewVal;
_ -> Element
end.
%% use lists:uniq instead when we upgrade to OTP 25
uniq([]) ->
[];
uniq([H|T]) ->
[H | [X || X <- uniq(T), H /= X]].