Skip to content

Commit 83f2f03

Browse files
committed
add [:phoenix, :socket_drain] telemetry event (#6070)
and also use it for logging drain events
1 parent aa6c94f commit 83f2f03

File tree

6 files changed

+54
-14
lines changed

6 files changed

+54
-14
lines changed

installer/templates/phx_web/telemetry.ex

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ defmodule <%= @web_namespace %>.Telemetry do
4343
summary("phoenix.socket_connected.duration",
4444
unit: {:native, :millisecond}
4545
),
46+
sum("phoenix.socket_drain.count"),
4647
summary("phoenix.channel_joined.duration",
4748
unit: {:native, :millisecond}
4849
),

lib/phoenix/endpoint.ex

+2
Original file line numberDiff line numberDiff line change
@@ -782,6 +782,8 @@ defmodule Phoenix.Endpoint do
782782
batch to terminate. Defaults to 2000ms.
783783
* `:shutdown` - The maximum amount of time in milliseconds allowed
784784
to drain all batches. Defaults to 30000ms.
785+
* `:log` - the log level for drain actions. Defaults the `:log` option
786+
passed to `use Phoenix.Socket` or `:info`. Set it to `false` to disable logging.
785787
786788
For example, if you have 150k connections, the default values will
787789
split them into 15 batches of 10k connections. Each batch takes

lib/phoenix/logger.ex

+22
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ defmodule Phoenix.Logger do
5858
* Metadata: `%{endpoint: atom, transport: atom, params: term, connect_info: map, vsn: binary, user_socket: atom, result: :ok | :error, serializer: atom, log: Logger.level | false}`
5959
* Disable logging: `use Phoenix.Socket, log: false` or `socket "/foo", MySocket, websocket: [log: false]` in your endpoint
6060
61+
* `[:phoenix, :socket_drain]` - dispatched by `Phoenix.Socket` when using the `:drainer` option
62+
* Measurement: `%{count: integer, total: integer, index: integer, rounds: integer}`
63+
* Metadata: `%{endpoint: atom, socket: atom, intervasl: integer, log: Logger.level | false}`
64+
* Disable logging: `use Phoenix.Socket, log: false` in your endpoint or pass `:log` option in the `:drainer` option
65+
6166
* `[:phoenix, :channel_joined]` - dispatched at the end of a channel join
6267
* Measurement: `%{duration: native_time}`
6368
* Metadata: `%{result: :ok | :error, params: term, socket: Phoenix.Socket.t}`
@@ -134,6 +139,7 @@ defmodule Phoenix.Logger do
134139
[:phoenix, :router_dispatch, :start] => &__MODULE__.phoenix_router_dispatch_start/4,
135140
[:phoenix, :error_rendered] => &__MODULE__.phoenix_error_rendered/4,
136141
[:phoenix, :socket_connected] => &__MODULE__.phoenix_socket_connected/4,
142+
[:phoenix, :socket_drain] => &__MODULE__.phoenix_socket_drain/4,
137143
[:phoenix, :channel_joined] => &__MODULE__.phoenix_channel_joined/4,
138144
[:phoenix, :channel_handled_in] => &__MODULE__.phoenix_channel_handled_in/4
139145
}
@@ -334,6 +340,22 @@ defmodule Phoenix.Logger do
334340
defp connect_result(:ok), do: "CONNECTED TO "
335341
defp connect_result(:error), do: "REFUSED CONNECTION TO "
336342

343+
@doc false
344+
def phoenix_socket_drain(_, _, %{log: false}, _), do: :ok
345+
346+
def phoenix_socket_drain(_, %{count: count, total: total, index: index, rounds: rounds}, %{log: level} = meta, _) do
347+
Logger.log(level, fn ->
348+
%{socket: socket, interval: interval} = meta
349+
350+
[
351+
"DRAINING #{count} of #{total} total connection(s) for socket ",
352+
inspect(socket),
353+
" every #{interval}ms - ",
354+
"round #{index} of #{rounds}"
355+
]
356+
end)
357+
end
358+
337359
## Event: [:phoenix, :channel_joined]
338360

339361
@doc false

lib/phoenix/socket.ex

+5-1
Original file line numberDiff line numberDiff line change
@@ -456,10 +456,14 @@ defmodule Phoenix.Socket do
456456
case drainer do
457457
{module, function, arguments} ->
458458
apply(module, function, arguments)
459+
459460
_ ->
460461
drainer
461462
end
462-
{Phoenix.Socket.PoolDrainer, {endpoint, handler, drainer}}
463+
464+
opts = Keyword.merge(opts, drainer: drainer)
465+
466+
{Phoenix.Socket.PoolDrainer, {endpoint, handler, opts}}
463467
else
464468
:ignore
465469
end

lib/phoenix/socket/pool_supervisor.ex

+19-10
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ defmodule Phoenix.Socket.PoolDrainer do
7676
%{
7777
id: {:terminator, name},
7878
start: {__MODULE__, :start_link, [tuple]},
79-
shutdown: Keyword.get(opts, :shutdown, 30_000)
79+
shutdown: Keyword.get(opts[:drainer], :shutdown, 30_000)
8080
}
8181
end
8282

@@ -87,13 +87,14 @@ defmodule Phoenix.Socket.PoolDrainer do
8787
@impl true
8888
def init({endpoint, name, opts}) do
8989
Process.flag(:trap_exit, true)
90-
size = Keyword.get(opts, :batch_size, 10_000)
91-
interval = Keyword.get(opts, :batch_interval, 2_000)
92-
{:ok, {endpoint, name, size, interval}}
90+
size = Keyword.get(opts[:drainer], :batch_size, 10_000)
91+
interval = Keyword.get(opts[:drainer], :batch_interval, 2_000)
92+
log_level = Keyword.get(opts[:drainer], :log, opts[:log] || :info)
93+
{:ok, {endpoint, name, size, interval, log_level}}
9394
end
9495

9596
@impl true
96-
def terminate(_reason, {endpoint, name, size, interval}) do
97+
def terminate(_reason, {endpoint, name, size, interval, log_level}) do
9798
ets = endpoint.config({:socket, name})
9899
partitions = :ets.lookup_element(ets, :partitions, 2)
99100

@@ -110,12 +111,20 @@ defmodule Phoenix.Socket.PoolDrainer do
110111

111112
rounds = div(total, size) + 1
112113

113-
if total != 0 do
114-
Logger.info("Shutting down #{total} sockets in #{rounds} rounds of #{interval}ms")
115-
end
116-
117114
for {pids, index} <-
118-
collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do
115+
collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do
116+
count = if index == rounds, do: length(pids), else: size
117+
118+
:telemetry.execute(
119+
[:phoenix, :socket_drain],
120+
%{count: count, total: total, index: index, rounds: rounds},
121+
%{
122+
endpoint: endpoint,
123+
socket: name,
124+
interval: interval,
125+
log: log_level
126+
}
127+
)
119128

120129
spawn(fn ->
121130
for pid <- pids do

test/phoenix/socket/socket_test.exs

+5-3
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ defmodule Phoenix.SocketTest do
7676
test "merges keyword lists" do
7777
socket = %Phoenix.Socket{}
7878
socket = assign(socket, %{foo: :bar, abc: :def})
79-
socket = assign(socket, [foo: :baz])
79+
socket = assign(socket, foo: :baz)
8080
assert socket.assigns[:foo] == :baz
8181
assert socket.assigns[:abc] == :def
8282
end
@@ -109,7 +109,8 @@ defmodule Phoenix.SocketTest do
109109
]
110110

111111
assert DrainerSpecSocket.drainer_spec(drainer: drainer_spec, endpoint: Endpoint) ==
112-
{Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}}
112+
{Phoenix.Socket.PoolDrainer,
113+
{Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}}
113114
end
114115

115116
test "loads dynamic drainer config" do
@@ -119,7 +120,8 @@ defmodule Phoenix.SocketTest do
119120
drainer: {DrainerSpecSocket, :dynamic_drainer_config, []},
120121
endpoint: Endpoint
121122
) ==
122-
{Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}}
123+
{Phoenix.Socket.PoolDrainer,
124+
{Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}}
123125
end
124126

125127
test "returns ignore if drainer is set to false" do

0 commit comments

Comments
 (0)