Skip to content

Commit 69d50ac

Browse files
authored
Merge pull request #35 from zhongwencool/fixed-gun-trailers-unknown
[handle gun_trailers message] return grpc_error message when stream is broken by etcd server
2 parents 89e5db5 + 13698cc commit 69d50ac

6 files changed

+39
-12
lines changed

rebar.lock

+5-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
{"1.1.0",
1+
{"1.2.0",
22
[{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.7.3">>},1},
33
{<<"gun">>,{pkg,<<"gun">>,<<"1.3.3">>},0}]}.
44
[
55
{pkg_hash,[
66
{<<"cowlib">>, <<"A7FFCD0917E6D50B4D5FB28E9E2085A0CEB3C97DEA310505F7460FF5ED764CE9">>},
7-
{<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]}
7+
{<<"gun">>, <<"CF8B51BEB36C22B9C8DF1921E3F2BC4D2B1F68B49AD4FBC64E91875AA14E16B4">>}]},
8+
{pkg_hash_ext,[
9+
{<<"cowlib">>, <<"1E1A3D176D52DAEBBECBBCDFD27C27726076567905C2A9D7398C54DA9D225761">>},
10+
{<<"gun">>, <<"3106CE167F9C9723F849E4FB54EA4A4D814E3996AE243A1C828B256E749041E0">>}]}
811
].

src/eetcd.app.src

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{application, eetcd,
22
[
33
{description, "ETCD V3 client"},
4-
{vsn, "0.3.3"},
4+
{vsn, "0.3.4"},
55
{registered, [eetcd_sup, eetcd_conn_sup, eetcd_lease_sup]},
66
{mod, {eetcd_app, []}},
77
{applications, [kernel, stdlib, gun]},

src/eetcd_election.erl

+11-1
Original file line numberDiff line numberDiff line change
@@ -285,12 +285,22 @@ observe_stream(OCtx, Msg) ->
285285
resp_stream(#{stream_ref := Ref, http2_pid := Pid},
286286
{gun_response, Pid, Ref, nofin, 200, _Headers}) ->
287287
receive {gun_data, Pid, Ref, nofin, Bin} ->
288-
{ok, Bin}
288+
receive {gun_trailers, Pid, Ref, [{<<"grpc-status">>, <<"0">>}, {<<"grpc-message">>, <<>>}]} ->
289+
{ok, Bin};
290+
{gun_trailers, Pid, Ref, [{<<"grpc-status">>, GrpcStatus}, {<<"grpc-message">>, GrpcMsg}]} ->
291+
{error, ?GRPC_ERROR(GrpcStatus, GrpcMsg)}
292+
after 2000 -> unknown
293+
end
289294
after 2000 -> unknown
290295
end;
291296
resp_stream(#{stream_ref := Ref, http2_pid := Pid},
292297
{gun_data, Pid, Ref, nofin, Bin}) ->
293298
{ok, Bin};
299+
resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
300+
{gun_trailers, Pid, SRef, [{<<"grpc-status">>, GrpcStatus}, {<<"grpc-message">>, GrpcMsg}]}) -> %% grpc error
301+
erlang:demonitor(MRef, [flush]),
302+
gun:cancel(Pid, SRef),
303+
{error, ?GRPC_ERROR(GrpcStatus, GrpcMsg)};
294304
resp_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
295305
{gun_error, Pid, SRef, Reason}) -> %% stream error
296306
erlang:demonitor(MRef, [flush]),

src/eetcd_stream.erl

+1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
139139
{gun_data, ServerPid, StreamRef, fin, Data} ->
140140
{ok, <<Acc/binary, Data/binary>>};
141141
%% It's OK to return trailers here because the client specifically requested them
142+
%% Trailers are grpc_status and grpc_message headers
142143
{gun_trailers, ServerPid, StreamRef, Trailers} ->
143144
{ok, Acc, Trailers};
144145
{gun_error, ServerPid, StreamRef, Reason} ->

src/eetcd_watch.erl

+7-2
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,13 @@ watch(Name, CreateReq, Timeout) ->
168168
%%that is, a gun_* message received on the gun connection.
169169
%%If it is, then this function will parse the message, turn it into watch responses, and possibly take action given the responses.
170170
%%If there's no error, this function returns {ok, WatchConn, 'Etcd.WatchResponse'()}|{more, WatchConn}
171-
%%If there's an error, {error, {stream_error | conn_error | http2_down, term()} | timeout} is returned.
171+
%%If there's an error, {error, {grpc_error, stream_error | conn_error | http2_down, term()} | timeout} is returned.
172172
%%If the given message is not from the gun connection, this function returns unknown.
173173
-spec watch_stream(watch_conn(), Message) ->
174174
{ok, watch_conn(), router_pb:'Etcd.WatchResponse'()}
175175
| {more, watch_conn()}
176176
| unknown
177-
| {error, {stream_error | conn_error | http2_down, term()}} when
177+
| {error, {grpc_error, stream_error | conn_error | http2_down, term()}} when
178178
Message :: term().
179179

180180
watch_stream(#{stream_ref := Ref, http2_pid := Pid, unprocessed := Unprocessed} = Conn,
@@ -194,6 +194,11 @@ watch_stream(#{stream_ref := Ref, http2_pid := Pid, unprocessed := Unprocessed}
194194
Resp};
195195
more -> {more, Conn#{unprocessed => Bin}}
196196
end;
197+
watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
198+
{gun_trailers, Pid, SRef, [{<<"grpc-status">>, Status}, {<<"grpc-message">>, Msg}]}) ->
199+
erlang:demonitor(MRef, [flush]),
200+
gun:cancel(Pid, SRef),
201+
{error, {grpc_error, ?GRPC_ERROR(Status, Msg)}};
197202
watch_stream(#{stream_ref := SRef, http2_pid := Pid, monitor_ref := MRef},
198203
{gun_error, Pid, SRef, Reason}) -> %% stream error
199204
erlang:demonitor(MRef, [flush]),

test/eetcd_election_leader_example.erl

+14-6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ resign(Pid) ->
3232
%%%===================================================================
3333

3434
init([Etcd, LeaderKey, Value]) ->
35+
logger:set_primary_config(#{level => info}),
3536
erlang:process_flag(trap_exit, true),
3637
{ok, #{'ID' := LeaseID}} = eetcd_lease:grant(Etcd, 8),
3738
{ok, _} = eetcd_lease:keep_alive(Etcd, LeaseID),
@@ -66,9 +67,11 @@ handle_info(Msg, State) ->
6667
#{campaign := Campaign, observe := Observe} = State,
6768
case eetcd_election:campaign_response(Campaign, Msg) of
6869
{ok, NewCampaign = #{campaign := Leader}} ->
70+
%% Only get this response when you win campaign by yourself.
71+
%% You are leader!
6972
win_campaign_event(Leader),
7073
{noreply, State#{campaign => NewCampaign}};
71-
{error, Reason} -> %% you can just let it crash and restart process
74+
{error, Reason} -> %% you can just let it crash and restart process or recampaign !!!
7275
campaign_unexpected_error(Reason),
7376
{noreply, State};
7477
unknown ->
@@ -95,17 +98,22 @@ code_change(_OldVsn, State = #{}, _Extra) ->
9598
%%%===================================================================
9699
%%% Internal functions
97100
%%%===================================================================
98-
win_campaign_event(_Leader) ->
101+
win_campaign_event(Leader) ->
102+
logger:info("win campaign event:~p", [Leader]),
99103
"Todo".
100104

101-
campaign_unexpected_error(_Reason) ->
105+
campaign_unexpected_error(Reason) ->
106+
logger:info("campaign unexpected error:~p", [Reason]),
102107
"Todo: try to recampaign".
103108

104-
leader_change_event(_Leader) ->
109+
leader_change_event(Leader) ->
110+
logger:info("leader change event:~p", [Leader]),
105111
"Todo".
106112

107-
observe_unexpected_error(_Reason) ->
113+
observe_unexpected_error(Reason) ->
114+
logger:info("observe unexpect error:~p", [Reason]),
108115
"Todo: try to reobserve after some sleep.".
109116

110-
handle_info_your_own_msg(_Msg, _State) ->
117+
handle_info_your_own_msg(Msg, State) ->
118+
logger:info("hanle info your own msg:~p ~p", [Msg, State]),
111119
"Todo".

0 commit comments

Comments
 (0)