Skip to content

Commit 740d333

Browse files
committed
Reconnect in the same process
This commit drops the spawned process used for reconnecting. Instead, the reconnect attempts are scheduled using timers. The option `reconnect_sleep` now applies to the time between a successful connect and the first reconnect, if the connection is lost just after connecting. However, there is no delay before reconnecting if the connection has been up for at least reconnect_sleep milliseconds.
1 parent f73ebd5 commit 740d333

File tree

4 files changed

+124
-131
lines changed

4 files changed

+124
-131
lines changed

include/eredis_sub.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
transport :: gen_tcp | ssl,
1212
socket :: gen_tcp:socket() | ssl:sslsocket() | undefined,
13+
reconnect_timer :: reference() | undefined,
1314
parser_state :: #pstate{} | undefined,
1415

1516
%% Channels we should subscribe to

src/eredis_client.erl

Lines changed: 51 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
terminate/2, code_change/3]).
3737

3838
%% Used by eredis_sub_client.erl
39-
-export([read_database/1, get_auth_command/2, connect/7, reconnect_loop/9]).
39+
-export([read_database/1, get_auth_command/2, connect/7]).
4040

4141
-record(state, {
4242
host :: string() | {local, string()} | undefined,
@@ -50,6 +50,7 @@
5050

5151
transport :: gen_tcp | ssl,
5252
socket :: gen_tcp:socket() | ssl:sslsocket() | undefined,
53+
reconnect_timer :: reference() | undefined,
5354
parser_state :: #pstate{} | undefined,
5455
queue :: eredis_queue() | undefined
5556
}).
@@ -156,18 +157,15 @@ handle_info({Type, Socket, Data},
156157
ok ->
157158
{noreply, State};
158159
{error, Reason} ->
159-
Transport:close(Socket),
160160
maybe_reconnect(Reason, State)
161161
end;
162162

163163
%% Socket errors. If the network or peer is down, the error is not
164164
%% always followed by a tcp_closed.
165165
%%
166166
%% TLS 1.3: Called after a connect when the client certificate has expired
167-
handle_info({Error, Socket, Reason},
168-
#state{socket = Socket, transport = Transport} = State)
167+
handle_info({Error, Socket, Reason}, #state{socket = Socket} = State)
169168
when Error =:= tcp_error; Error =:= ssl_error ->
170-
Transport:close(Socket),
171169
maybe_reconnect(Reason, State);
172170

173171
%% Socket got closed, for example by Redis terminating idle
@@ -179,7 +177,7 @@ handle_info({Error, Socket, Reason},
179177
handle_info({Closed, Socket}, #state{socket = OurSocket} = State)
180178
when Closed =:= tcp_closed orelse Closed =:= ssl_closed,
181179
Socket =:= OurSocket orelse Socket =:= fake_socket ->
182-
maybe_reconnect(Closed, State);
180+
maybe_reconnect(Closed, State#state{socket = undefined});
183181

184182
%% Ignore messages and errors for an old socket.
185183
handle_info({Type, Socket, _}, #state{socket = OurSocket} = State)
@@ -198,9 +196,7 @@ handle_info({Type, Socket}, #state{socket = OurSocket} = State)
198196

199197
%% Errors returned by gen_tcp:send/2 and ssl:send/2 are handled
200198
%% asynchronously by message passing to self.
201-
handle_info({send_error, Socket, Reason},
202-
#state{transport = Transport, socket = Socket} = State) ->
203-
Transport:close(Socket),
199+
handle_info({send_error, Socket, Reason}, #state{socket = Socket} = State) ->
204200
maybe_reconnect(Reason, State);
205201

206202
handle_info({send_error, _Socket, _Reason}, State) ->
@@ -217,22 +213,20 @@ handle_info({connection_ready, Socket}, #state{socket = undefined} = State) ->
217213
handle_info(stop, State) ->
218214
{stop, shutdown, State};
219215

220-
handle_info(initiate_connection,
221-
#state{socket = undefined,
222-
reconnect_sleep = ReconnectSleep} = State) ->
216+
handle_info(initiate_connection, #state{socket = undefined} = State) ->
223217
case connect(State) of
224218
{ok, NewState} ->
225219
{noreply, NewState};
226-
{error, _Reason} when ReconnectSleep =:= no_reconnect ->
227-
{stop, normal, State};
228220
{error, Reason} ->
229-
erlang:send_after(ReconnectSleep, self(), {reconnect, Reason}),
230-
{noreply, State}
221+
{noreply, schedule_reconnect(Reason, State)}
231222
end;
232223

233224
handle_info({reconnect, Reason}, #state{socket = undefined} = State) ->
234-
%% Scheduled reconnect
235-
maybe_reconnect(Reason, State);
225+
%% Scheduled reconnect, if disconnected.
226+
maybe_reconnect(Reason, State#state{reconnect_timer = undefined});
227+
handle_info({reconnect, _Reason}, State) ->
228+
%% Already connected.
229+
{noreply, State#state{reconnect_timer = undefined}};
236230

237231
handle_info(_Info, State) ->
238232
{noreply, State}.
@@ -363,7 +357,9 @@ safe_send(Pid, Value) ->
363357
end.
364358

365359
%% @doc: Helper for connecting to Redis, authenticating and selecting
366-
%% the correct database synchronously.
360+
%% the correct database synchronously. On successful connect, a reconnect
361+
%% is scheduled, just in case the connection breaks immediately afterwards,
362+
%% so we don't reconnect until reconnect_sleep milliseconds has elapsed.
367363
%% Returns: {ok, State} or {error, Reason}.
368364
connect(#state{host = Host,
369365
port = Port,
@@ -375,7 +371,11 @@ connect(#state{host = Host,
375371
case connect(Host, Port, SocketOptions, TlsOptions,
376372
ConnectTimeout, AuthCmd, Db) of
377373
{ok, Socket} ->
378-
{ok, State#state{socket = Socket}};
374+
%% In case the connection terminates immediately (this happens with
375+
%% an expired certificate with TLS 1.3) schedule a reconnect already
376+
%% so that we don't try to reconnect if an error is received before
377+
%% reconnect_sleep milliseconds has elapsed.
378+
{ok, schedule_reconnect(unknown, State#state{socket = Socket})};
379379
Error ->
380380
Error
381381
end.
@@ -529,64 +529,53 @@ transport_module(_) -> ssl.
529529
setopts(Socket, _Transport=gen_tcp, Opts) -> inet:setopts(Socket, Opts);
530530
setopts(Socket, _Transport=ssl, Opts) -> ssl:setopts(Socket, Opts).
531531

532+
close_socket(#state{socket = undefined} = State) ->
533+
State;
534+
close_socket(#state{socket = Socket, transport = Transport} = State) ->
535+
Transport:close(Socket),
536+
State#state{socket = undefined}.
537+
538+
%% @doc Schedules a reconnect attempt, if reconnect is enabled.
539+
-spec schedule_reconnect(Reason :: any(), #state{}) -> #state{}.
540+
schedule_reconnect(_Reason, #state{reconnect_sleep = no_reconnect} = State) ->
541+
State;
542+
schedule_reconnect(Reason, #state{reconnect_sleep = ReconnectSleep,
543+
reconnect_timer = undefined} = State) ->
544+
TRef = erlang:send_after(ReconnectSleep, self(), {reconnect, Reason}),
545+
State#state{reconnect_timer = TRef}.
546+
547+
%% @doc Reconnects, but not if a reconnect has been scheduled or if reconnect is
548+
%% disabled. The socket in the state is closed, if any. Returns {noreply, State}
549+
%% or {stop, ExitReason, State} like handle_info.
532550
maybe_reconnect(Reason, #state{reconnect_sleep = no_reconnect, queue = Queue} = State) ->
533551
reply_all({error, Reason}, Queue),
534552
%% If we aren't going to reconnect, then there is nothing else for
535553
%% this process to do.
536-
{stop, normal, State#state{socket = undefined}};
554+
{stop, normal, close_socket(State)};
555+
maybe_reconnect(Reason, #state{queue = Queue, reconnect_timer = TRef} = State)
556+
when is_reference(TRef) ->
557+
%% Reconnect already scheduled.
558+
reply_all({error, Reason}, Queue),
559+
{noreply, close_socket(State#state{queue = queue:new()})};
537560
maybe_reconnect(Reason,
538561
#state{queue = Queue,
539562
host = Host,
540563
port = Port,
541-
socket_options = SocketOptions,
542-
tls_options = TlsOptions,
543-
connect_timeout = ConnectTimeout,
544-
reconnect_sleep = ReconnectSleep,
545-
auth_cmd = AuthCmd,
546-
database = Db} = State) ->
564+
reconnect_timer = undefined} = State) ->
547565
error_logger:error_msg("eredis: Re-establishing connection to ~p:~p due to ~p",
548566
[Host, Port, Reason]),
549-
Self = self(),
550-
spawn_link(fun() ->
551-
process_flag(trap_exit, true),
552-
reconnect_loop(Self, ReconnectSleep, Host, Port,
553-
SocketOptions, TlsOptions, ConnectTimeout,
554-
AuthCmd, Db)
555-
end),
556-
557-
%% tell all of our clients what has happened.
567+
%% Tell all of our clients what has happened.
558568
reply_all({error, Reason}, Queue),
559569

560570
%% Throw away the socket and the queue, as we will never get a
561571
%% response to the requests sent on the old socket. The absence of
562572
%% a socket is used to signal we are "down"
563-
{noreply, State#state{socket = undefined, queue = queue:new()}}.
564-
565-
%% @doc: Loop until a connection can be established, this includes
566-
%% successfully issuing the auth and select calls. When we have a
567-
%% connection, send the socket to Client in a message on the form
568-
%% `{connection_ready, Socket}'.
569-
reconnect_loop(Client, ReconnectSleep, Host, Port, SocketOptions,
570-
TlsOptions, ConnectTimeout, AuthCmd, Db) ->
571-
Client ! reconnect_attempt,
572-
case connect(Host, Port, SocketOptions, TlsOptions, ConnectTimeout,
573-
AuthCmd, Db) of
574-
{ok, Socket} ->
575-
Client ! {connection_ready, Socket},
576-
Transport = transport_module(TlsOptions),
577-
Transport:controlling_process(Socket, Client),
578-
Msgs = get_all_messages([]),
579-
[Client ! M || M <- Msgs];
573+
State1 = close_socket(State#state{queue = queue:new()}),
574+
case connect(State1) of
575+
{ok, State2} ->
576+
{noreply, State2};
580577
{error, Reason} ->
581-
Client ! {reconnect_failed, Reason},
582-
receive
583-
{'EXIT', Client, Reason} -> exit(Reason)
584-
after
585-
ReconnectSleep ->
586-
reconnect_loop(Client, ReconnectSleep, Host, Port,
587-
SocketOptions, TlsOptions, ConnectTimeout,
588-
AuthCmd, Db)
589-
end
578+
{noreply, schedule_reconnect(Reason, State1)}
590579
end.
591580

592581
read_database(undefined) ->
@@ -605,11 +594,3 @@ get_auth_command(undefined, Password) ->
605594
eredis:create_multibulk([<<"AUTH">>, Password]);
606595
get_auth_command(Username, Password) ->
607596
eredis:create_multibulk([<<"AUTH">>, Username, Password]).
608-
609-
get_all_messages(Acc) ->
610-
receive
611-
M ->
612-
get_all_messages([M | Acc])
613-
after 0 ->
614-
lists:reverse(Acc)
615-
end.

src/eredis_sub_client.erl

Lines changed: 68 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,8 @@ handle_info({Type, Socket, Bs},
201201
{noreply, NewState}
202202
end;
203203

204-
handle_info({Error, Socket, _Reason},
205-
#state{socket = Socket, transport = Transport} = State)
204+
handle_info({Error, Socket, _Reason}, #state{socket = Socket} = State)
206205
when Error =:= tcp_error; Error =:= ssl_error ->
207-
Transport:close(Socket),
208206
maybe_reconnect(Error, State);
209207

210208
%% Socket got closed, for example by Redis terminating idle
@@ -214,49 +212,26 @@ handle_info({Error, Socket, _Reason},
214212
handle_info({Closed, Socket}, #state{socket = OurSocket} = State)
215213
when Closed =:= tcp_closed orelse Closed =:= ssl_closed,
216214
Socket =:= OurSocket orelse Socket =:= fake_socket ->
217-
maybe_reconnect(Closed, State);
215+
send_to_controller({eredis_disconnected, self()}, State),
216+
maybe_reconnect(Closed, State#state{socket = undefined});
218217

219218
handle_info(initiate_connection,
220219
#state{socket = undefined,
221220
reconnect_sleep = ReconnectSleep} = State) ->
222221
case connect(State) of
223222
{ok, NewState} ->
224223
{noreply, NewState};
225-
{error, _Reason} when ReconnectSleep =:= no_reconnect ->
226-
{stop, normal, State};
227224
{error, Reason} ->
228225
erlang:send_after(ReconnectSleep, self(), {reconnect, Reason}),
229226
{noreply, State}
230227
end;
231228

232229
handle_info({reconnect, Reason}, #state{socket = undefined} = State) ->
233-
%% Scheduled reconnect
234-
maybe_reconnect(Reason, State);
235-
236-
%% Controller might want to be notified about every reconnect attempt
237-
handle_info(reconnect_attempt, State) ->
238-
send_to_controller({eredis_reconnect_attempt, self()}, State),
239-
{noreply, State};
240-
241-
%% Controller might want to be notified about every reconnect failure and reason
242-
handle_info({reconnect_failed, Reason}, State) ->
243-
send_to_controller({eredis_reconnect_failed, self(),
244-
{error, {connection_error, Reason}}}, State),
245-
{noreply, State};
246-
247-
%% Redis is ready to accept requests, the given Socket is a socket
248-
%% already connected and authenticated.
249-
handle_info({connection_ready, Socket},
250-
#state{socket = undefined, transport = Transport} = State) ->
251-
send_to_controller({eredis_connected, self()}, State),
252-
%% Re-subscribe to channels. Channels are stored in reverse order in state.
253-
ok = send_subscribe_command(Transport, Socket, "SUBSCRIBE",
254-
lists:reverse(State#state.channels)),
255-
ok = send_subscribe_command(Transport, Socket, "PSUBSCRIBE",
256-
lists:reverse(State#state.pchannels)),
257-
ok = setopts(Socket, Transport, [{active, once}]),
258-
{noreply, State#state{socket = Socket}};
259-
230+
%% Scheduled reconnect, if disconnected.
231+
maybe_reconnect(Reason, State#state{reconnect_timer = undefined});
232+
handle_info({reconnect, _Reason}, State) ->
233+
%% Already connected.
234+
{noreply, State#state{reconnect_timer = undefined}};
260235

261236
%% Our controlling process is down.
262237
handle_info({'DOWN', Ref, process, Pid, _Reason},
@@ -379,42 +354,77 @@ queue_or_send(Msg, State) ->
379354
%% synchronous and if Redis returns something we don't expect, we
380355
%% crash. Returns {ok, State} or {error, Reason}.
381356
connect(#state{host = Host, port = Port, socket_options = SocketOptions,
357+
transport = Transport,
382358
connect_timeout = ConnectTimeout, tls_options = TlsOptions,
383359
auth_cmd = AuthCmd, database = Db} = State) ->
384360
case eredis_client:connect(Host, Port, SocketOptions, TlsOptions,
385361
ConnectTimeout, AuthCmd, Db) of
386362
{ok, Socket} ->
387-
{ok, State#state{socket = Socket}};
363+
%% Re-subscribe to channels. Channels are stored in reverse order in
364+
%% state.
365+
ok = send_subscribe_command(Transport, Socket, "SUBSCRIBE",
366+
lists:reverse(State#state.channels)),
367+
ok = send_subscribe_command(Transport, Socket, "PSUBSCRIBE",
368+
lists:reverse(State#state.pchannels)),
369+
ok = setopts(Socket, Transport, [{active, once}]),
370+
371+
%% Notify application that connection is ready.
372+
send_to_controller({eredis_connected, self()}, State),
373+
374+
%% In case the connection terminates immediately (this happens with
375+
%% an expired certificate with TLS 1.3) schedule a reconnect already
376+
%% so that we don't try to reconnect if an error is received before
377+
%% reconnect_sleep milliseconds has elapsed.
378+
{ok, schedule_reconnect(unknown, State#state{socket = Socket})};
388379
Error ->
389380
Error
390381
end.
391382

392-
%% Helper for handle_info/2. Returns {noreply, _} or {stop, _, _}.
393-
maybe_reconnect(_Reason, #state{reconnect_sleep = no_reconnect} = State) ->
394-
%% If we aren't going to reconnect, then there is nothing else for this process to do.
395-
{stop, normal, State#state{socket = undefined}};
396-
maybe_reconnect(_Reason,
397-
#state{host = Host,
398-
port = Port,
399-
socket_options = SocketOptions,
400-
tls_options = TlsOptions,
401-
connect_timeout = ConnectTimeout,
402-
reconnect_sleep = ReconnectSleep,
403-
auth_cmd = AuthCmd,
404-
database = Db} = State) ->
405-
Self = self(),
406-
send_to_controller({eredis_disconnected, Self}, State),
407-
spawn_link(fun() ->
408-
process_flag(trap_exit, true),
409-
eredis_client:reconnect_loop(Self, ReconnectSleep,
410-
Host, Port, SocketOptions,
411-
TlsOptions, ConnectTimeout,
412-
AuthCmd, Db)
413-
end),
414-
383+
close_socket(#state{socket = undefined} = State) ->
384+
State;
385+
close_socket(#state{socket = Socket, transport = Transport} = State) ->
386+
send_to_controller({eredis_disconnected, self()}, State),
387+
Transport:close(Socket),
415388
%% Throw away the socket. The absence of a socket is used to
416389
%% signal we are "down"; discard possibly patrially parsed data
417-
{noreply, State#state{socket = undefined, parser_state = eredis_parser:init()}}.
390+
State#state{socket = undefined, parser_state = eredis_parser:init()}.
391+
392+
%% @doc Schedules a reconnect attempt, if reconnect is enabled.
393+
-spec schedule_reconnect(Reason :: any(), #state{}) -> #state{}.
394+
schedule_reconnect(_Reason, #state{reconnect_sleep = no_reconnect} = State) ->
395+
State;
396+
schedule_reconnect(Reason, #state{reconnect_sleep = ReconnectSleep,
397+
reconnect_timer = undefined} = State) ->
398+
TRef = erlang:send_after(ReconnectSleep, self(), {reconnect, Reason}),
399+
State#state{reconnect_timer = TRef}.
400+
401+
%% @doc Reconnects, but not if a reconnect has been scheduled or if reconnect is
402+
%% disabled. The socket in the state is closed, if any. Returns {noreply, State}
403+
%% or {stop, ExitReason, State} like handle_info.
404+
maybe_reconnect(_Reason, #state{reconnect_sleep = no_reconnect} = State) ->
405+
%% If we aren't going to reconnect, then there is nothing else for this
406+
%% process to do.
407+
{stop, normal, close_socket(State)};
408+
maybe_reconnect(_Reason, #state{reconnect_timer = TRef} = State)
409+
when is_reference(TRef) ->
410+
%% Reconnect already scheduled.
411+
{noreply, close_socket(State)};
412+
maybe_reconnect(_Reason, State) ->
413+
State1 = close_socket(State),
414+
415+
%% Controller might want to be notified about every reconnect attempt
416+
send_to_controller({eredis_reconnect_attempt, self()}, State1),
417+
418+
case connect(State1) of
419+
{ok, State2} ->
420+
{noreply, State2};
421+
{error, Reason} ->
422+
%% Controller might want to be notified about every reconnect
423+
%% failure and reason
424+
send_to_controller({eredis_reconnect_failed, self(),
425+
{error, {connection_error, Reason}}}, State1),
426+
{noreply, schedule_reconnect(Reason, State1)}
427+
end.
418428

419429
setopts(Socket, _Transport=gen_tcp, Opts) -> inet:setopts(Socket, Opts);
420430
setopts(Socket, _Transport=ssl, Opts) -> ssl:setopts(Socket, Opts).

0 commit comments

Comments
 (0)