Skip to content

Commit 6c90c21

Browse files
committed
Fix pipelining when :initial request is closed abnormally
Closes #669. To prevent the same issue from being able to appear for 429s, that path is also handled now.
1 parent 868b8d1 commit 6c90c21

File tree

1 file changed

+54
-8
lines changed

1 file changed

+54
-8
lines changed

lib/nostrum/api/ratelimiter.ex

+54-8
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,13 @@ defmodule Nostrum.Api.Ratelimiter do
214214
@bot_calls_time_window :timer.seconds(1)
215215
@bot_calls_timeout_event :reset_bot_calls_window
216216

217+
# Retry requests for buckets with no known ratelimit information that were
218+
# abormally closed after this much time.
219+
@retry_abnormal_close_after :timer.seconds(1)
220+
# Retry requests for buckets with no known ratelimit information that hit a
221+
# 429 after this much time.
222+
@retry_429s_after :timer.seconds(10)
223+
217224
@typedoc """
218225
A bucket for endpoints unter the same ratelimit.
219226
"""
@@ -453,8 +460,41 @@ defmodule Nostrum.Api.Ratelimiter do
453460
end
454461
end
455462

456-
def connected(:internal, {:requeue, {_payload, _from} = request}, _data) do
457-
{:keep_state_and_data, {:next_event, :internal, {:queue, request}}}
463+
def connected(:internal, {:requeue, {request, from} = statem_request, reason}, %{
464+
outstanding: outstanding
465+
}) do
466+
bucket = get_endpoint(request.route, request.method)
467+
468+
expirers =
469+
case outstanding do
470+
%{^bucket => {:initial, _queue}} ->
471+
# If we're heading here, that means that the request we wish to requeue
472+
# is likely (but not certain) the initial request to an endpoint.
473+
# The `:requeue` internal event is used to ask the ratelimiter to requeue
474+
# requests that have failed either 1. due to a 429, or 2. due to an error
475+
# with the stream, this function matches only the second case. Since
476+
# this is the initial request, the `:queue` logic will not send out
477+
# new requests and append them to the end of the queue, which would
478+
# cause the request (and any further going to the endpoint) to hang
479+
# indefinitely.
480+
481+
requeue_after = requeue_after_for_reason(reason)
482+
483+
Logger.warning(
484+
"Retrying request for reason #{reason} with no known ratelimit information to #{inspect(request.route)} queued by #{inspect(from)} in #{requeue_after}ms"
485+
)
486+
487+
[{{:timeout, bucket}, requeue_after, :expired}]
488+
489+
_ ->
490+
[]
491+
end
492+
493+
{:keep_state_and_data, [{:next_event, :internal, {:queue, statem_request}} | expirers]}
494+
end
495+
496+
def connected(:internal, {:requeue, {_request, _from} = statem_request, _reason}, _data) do
497+
{:keep_state_and_data, {:next_event, :internal, {:queue, statem_request}}}
458498
end
459499

460500
# Run the given request right now, and do any bookkeeping. We assert we are
@@ -703,7 +743,7 @@ defmodule Nostrum.Api.Ratelimiter do
703743
# state will need to deal with the requeue request (most likely by
704744
# postponing it).
705745
parse_limits,
706-
{:next_event, :internal, {:requeue, {request, from}}}
746+
{:next_event, :internal, {:requeue, {request, from}, :hit_429}}
707747
]}
708748

709749
kind == :fin ->
@@ -755,7 +795,7 @@ defmodule Nostrum.Api.Ratelimiter do
755795
# an earlier step.
756796
{:keep_state, new_data,
757797
[
758-
{:next_event, :internal, {:requeue, {request, from}}}
798+
{:next_event, :internal, {:requeue, {request, from}, :hit_429}}
759799
]}
760800

761801
_ ->
@@ -846,9 +886,10 @@ defmodule Nostrum.Api.Ratelimiter do
846886
end
847887

848888
# A running request was killed - suboptimal. Log a warning and try again.
849-
def connected(:info, {:gun_error, _conn, stream, reason}, %{running: running} = data)
889+
def connected(:info, {:gun_error, conn, stream, reason}, %{running: running} = data)
850890
when is_map_key(running, stream) do
851-
# Ensure that we do not get further garbage for this stream
891+
# Ensure that we do not get further garbage for this stream.
892+
:ok = :gun.cancel(conn, stream)
852893
:ok = :gun.flush(stream)
853894

854895
{{_bucket, request, from}, running_without_it} = Map.pop(running, stream)
@@ -858,7 +899,7 @@ defmodule Nostrum.Api.Ratelimiter do
858899
)
859900

860901
{:keep_state, %{data | running: running_without_it},
861-
{:next_event, :internal, {:requeue, {request, from}}}}
902+
{:next_event, :internal, {:requeue, {request, from}, :abnormal_close}}}
862903
end
863904

864905
def connected(:info, {:gun_down, conn, _, reason, killed_streams}, %{
@@ -914,7 +955,7 @@ defmodule Nostrum.Api.Ratelimiter do
914955
# always hit at least once after entering the global limit state. Instead of
915956
# returning an error to the client we postpone it until we can deal with it
916957
# again.
917-
def global_limit(:internal, {:requeue, {_request, _from}}, _data) do
958+
def global_limit(:internal, {:requeue, {_request, _from}, _reason}, _data) do
918959
{:keep_state_and_data, :postpone}
919960
end
920961

@@ -1021,6 +1062,11 @@ defmodule Nostrum.Api.Ratelimiter do
10211062
end
10221063
end
10231064

1065+
# Use :timer.time() when / if it is exported
1066+
@spec requeue_after_for_reason(:hit_429 | :abnormal_close) :: pos_integer()
1067+
defp requeue_after_for_reason(:hit_429), do: @retry_429s_after
1068+
defp requeue_after_for_reason(:abnormal_close), do: @retry_abnormal_close_after
1069+
10241070
@doc """
10251071
Retrieves a proper ratelimit endpoint from a given route and url.
10261072
"""

0 commit comments

Comments
 (0)